1package pubsub 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "io" 8 "math/rand" 9 "sync" 10 "testing" 11 "time" 12 13 pb "github.com/libp2p/go-libp2p-pubsub/pb" 14 15 "github.com/libp2p/go-libp2p-core/host" 16 "github.com/libp2p/go-libp2p-core/network" 17 "github.com/libp2p/go-libp2p-core/peer" 18 "github.com/libp2p/go-libp2p-core/peerstore" 19 "github.com/libp2p/go-libp2p-core/record" 20 21 bhost "github.com/libp2p/go-libp2p-blankhost" 22 swarmt "github.com/libp2p/go-libp2p-swarm/testing" 23 24 "github.com/libp2p/go-msgio/protoio" 25) 26 27func getGossipsub(ctx context.Context, h host.Host, opts ...Option) *PubSub { 28 ps, err := NewGossipSub(ctx, h, opts...) 29 if err != nil { 30 panic(err) 31 } 32 return ps 33} 34 35func getGossipsubs(ctx context.Context, hs []host.Host, opts ...Option) []*PubSub { 36 var psubs []*PubSub 37 for _, h := range hs { 38 psubs = append(psubs, getGossipsub(ctx, h, opts...)) 39 } 40 return psubs 41} 42 43func TestSparseGossipsub(t *testing.T) { 44 ctx, cancel := context.WithCancel(context.Background()) 45 defer cancel() 46 hosts := getNetHosts(t, ctx, 20) 47 48 psubs := getGossipsubs(ctx, hosts) 49 50 var msgs []*Subscription 51 for _, ps := range psubs { 52 subch, err := ps.Subscribe("foobar") 53 if err != nil { 54 t.Fatal(err) 55 } 56 57 msgs = append(msgs, subch) 58 } 59 60 sparseConnect(t, hosts) 61 62 // wait for heartbeats to build mesh 63 time.Sleep(time.Second * 2) 64 65 for i := 0; i < 100; i++ { 66 msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) 67 68 owner := rand.Intn(len(psubs)) 69 70 psubs[owner].Publish("foobar", msg) 71 72 for _, sub := range msgs { 73 got, err := sub.Next(ctx) 74 if err != nil { 75 t.Fatal(sub.err) 76 } 77 if !bytes.Equal(msg, got.Data) { 78 t.Fatal("got wrong message!") 79 } 80 } 81 } 82} 83 84func TestDenseGossipsub(t *testing.T) { 85 ctx, cancel := context.WithCancel(context.Background()) 86 defer cancel() 87 hosts := getNetHosts(t, ctx, 20) 88 89 psubs := getGossipsubs(ctx, hosts) 90 91 var msgs []*Subscription 92 for _, ps := range psubs { 93 subch, err := ps.Subscribe("foobar") 94 if err != nil { 95 t.Fatal(err) 96 } 97 98 msgs = append(msgs, subch) 99 } 100 101 denseConnect(t, hosts) 102 103 // wait for heartbeats to build mesh 104 time.Sleep(time.Second * 2) 105 106 for i := 0; i < 100; i++ { 107 msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) 108 109 owner := rand.Intn(len(psubs)) 110 111 psubs[owner].Publish("foobar", msg) 112 113 for _, sub := range msgs { 114 got, err := sub.Next(ctx) 115 if err != nil { 116 t.Fatal(sub.err) 117 } 118 if !bytes.Equal(msg, got.Data) { 119 t.Fatal("got wrong message!") 120 } 121 } 122 } 123} 124 125func TestGossipsubFanout(t *testing.T) { 126 ctx, cancel := context.WithCancel(context.Background()) 127 defer cancel() 128 hosts := getNetHosts(t, ctx, 20) 129 130 psubs := getGossipsubs(ctx, hosts) 131 132 var msgs []*Subscription 133 for _, ps := range psubs[1:] { 134 subch, err := ps.Subscribe("foobar") 135 if err != nil { 136 t.Fatal(err) 137 } 138 139 msgs = append(msgs, subch) 140 } 141 142 denseConnect(t, hosts) 143 144 // wait for heartbeats to build mesh 145 time.Sleep(time.Second * 2) 146 147 for i := 0; i < 100; i++ { 148 msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) 149 150 owner := 0 151 152 psubs[owner].Publish("foobar", msg) 153 154 for _, sub := range msgs { 155 got, err := sub.Next(ctx) 156 if err != nil { 157 t.Fatal(sub.err) 158 } 159 if !bytes.Equal(msg, got.Data) { 160 t.Fatal("got wrong message!") 161 } 162 } 163 } 164 165 // subscribe the owner 166 subch, err := psubs[0].Subscribe("foobar") 167 if err != nil { 168 t.Fatal(err) 169 } 170 msgs = append(msgs, subch) 171 172 // wait for a heartbeat 173 time.Sleep(time.Second * 1) 174 175 for i := 0; i < 100; i++ { 176 msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) 177 178 owner := 0 179 180 psubs[owner].Publish("foobar", msg) 181 182 for _, sub := range msgs { 183 got, err := sub.Next(ctx) 184 if err != nil { 185 t.Fatal(sub.err) 186 } 187 if !bytes.Equal(msg, got.Data) { 188 t.Fatal("got wrong message!") 189 } 190 } 191 } 192} 193 194func TestGossipsubFanoutMaintenance(t *testing.T) { 195 ctx, cancel := context.WithCancel(context.Background()) 196 defer cancel() 197 hosts := getNetHosts(t, ctx, 20) 198 199 psubs := getGossipsubs(ctx, hosts) 200 201 var msgs []*Subscription 202 for _, ps := range psubs[1:] { 203 subch, err := ps.Subscribe("foobar") 204 if err != nil { 205 t.Fatal(err) 206 } 207 208 msgs = append(msgs, subch) 209 } 210 211 denseConnect(t, hosts) 212 213 // wait for heartbeats to build mesh 214 time.Sleep(time.Second * 2) 215 216 for i := 0; i < 100; i++ { 217 msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) 218 219 owner := 0 220 221 psubs[owner].Publish("foobar", msg) 222 223 for _, sub := range msgs { 224 got, err := sub.Next(ctx) 225 if err != nil { 226 t.Fatal(sub.err) 227 } 228 if !bytes.Equal(msg, got.Data) { 229 t.Fatal("got wrong message!") 230 } 231 } 232 } 233 234 // unsubscribe all peers to exercise fanout maintenance 235 for _, sub := range msgs { 236 sub.Cancel() 237 } 238 msgs = nil 239 240 // wait for heartbeats 241 time.Sleep(time.Second * 2) 242 243 // resubscribe and repeat 244 for _, ps := range psubs[1:] { 245 subch, err := ps.Subscribe("foobar") 246 if err != nil { 247 t.Fatal(err) 248 } 249 250 msgs = append(msgs, subch) 251 } 252 253 time.Sleep(time.Second * 2) 254 255 for i := 0; i < 100; i++ { 256 msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) 257 258 owner := 0 259 260 psubs[owner].Publish("foobar", msg) 261 262 for _, sub := range msgs { 263 got, err := sub.Next(ctx) 264 if err != nil { 265 t.Fatal(sub.err) 266 } 267 if !bytes.Equal(msg, got.Data) { 268 t.Fatal("got wrong message!") 269 } 270 } 271 } 272} 273 274func TestGossipsubFanoutExpiry(t *testing.T) { 275 GossipSubFanoutTTL = 1 * time.Second 276 defer func() { 277 GossipSubFanoutTTL = 60 * time.Second 278 }() 279 280 ctx, cancel := context.WithCancel(context.Background()) 281 defer cancel() 282 hosts := getNetHosts(t, ctx, 10) 283 284 psubs := getGossipsubs(ctx, hosts) 285 286 var msgs []*Subscription 287 for _, ps := range psubs[1:] { 288 subch, err := ps.Subscribe("foobar") 289 if err != nil { 290 t.Fatal(err) 291 } 292 293 msgs = append(msgs, subch) 294 } 295 296 denseConnect(t, hosts) 297 298 // wait for heartbeats to build mesh 299 time.Sleep(time.Second * 2) 300 301 for i := 0; i < 5; i++ { 302 msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) 303 304 owner := 0 305 306 psubs[owner].Publish("foobar", msg) 307 308 for _, sub := range msgs { 309 got, err := sub.Next(ctx) 310 if err != nil { 311 t.Fatal(sub.err) 312 } 313 if !bytes.Equal(msg, got.Data) { 314 t.Fatal("got wrong message!") 315 } 316 } 317 } 318 319 psubs[0].eval <- func() { 320 if len(psubs[0].rt.(*GossipSubRouter).fanout) == 0 { 321 t.Fatal("owner has no fanout") 322 } 323 } 324 325 // wait for TTL to expire fanout peers in owner 326 time.Sleep(time.Second * 2) 327 328 psubs[0].eval <- func() { 329 if len(psubs[0].rt.(*GossipSubRouter).fanout) > 0 { 330 t.Fatal("fanout hasn't expired") 331 } 332 } 333 334 // wait for it to run in the event loop 335 time.Sleep(10 * time.Millisecond) 336} 337 338func TestGossipsubGossip(t *testing.T) { 339 ctx, cancel := context.WithCancel(context.Background()) 340 defer cancel() 341 hosts := getNetHosts(t, ctx, 20) 342 343 psubs := getGossipsubs(ctx, hosts) 344 345 var msgs []*Subscription 346 for _, ps := range psubs { 347 subch, err := ps.Subscribe("foobar") 348 if err != nil { 349 t.Fatal(err) 350 } 351 352 msgs = append(msgs, subch) 353 } 354 355 denseConnect(t, hosts) 356 357 // wait for heartbeats to build mesh 358 time.Sleep(time.Second * 2) 359 360 for i := 0; i < 100; i++ { 361 msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) 362 363 owner := rand.Intn(len(psubs)) 364 365 psubs[owner].Publish("foobar", msg) 366 367 for _, sub := range msgs { 368 got, err := sub.Next(ctx) 369 if err != nil { 370 t.Fatal(sub.err) 371 } 372 if !bytes.Equal(msg, got.Data) { 373 t.Fatal("got wrong message!") 374 } 375 } 376 377 // wait a bit to have some gossip interleaved 378 time.Sleep(time.Millisecond * 100) 379 } 380 381 // and wait for some gossip flushing 382 time.Sleep(time.Second * 2) 383} 384 385func TestGossipsubGossipPiggyback(t *testing.T) { 386 t.Skip("test no longer relevant; gossip propagation has become eager") 387 ctx, cancel := context.WithCancel(context.Background()) 388 defer cancel() 389 hosts := getNetHosts(t, ctx, 20) 390 391 psubs := getGossipsubs(ctx, hosts) 392 393 var msgs []*Subscription 394 for _, ps := range psubs { 395 subch, err := ps.Subscribe("foobar") 396 if err != nil { 397 t.Fatal(err) 398 } 399 400 msgs = append(msgs, subch) 401 } 402 403 var xmsgs []*Subscription 404 for _, ps := range psubs { 405 subch, err := ps.Subscribe("bazcrux") 406 if err != nil { 407 t.Fatal(err) 408 } 409 410 xmsgs = append(xmsgs, subch) 411 } 412 413 denseConnect(t, hosts) 414 415 // wait for heartbeats to build mesh 416 time.Sleep(time.Second * 2) 417 418 for i := 0; i < 100; i++ { 419 msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) 420 421 owner := rand.Intn(len(psubs)) 422 423 psubs[owner].Publish("foobar", msg) 424 psubs[owner].Publish("bazcrux", msg) 425 426 for _, sub := range msgs { 427 got, err := sub.Next(ctx) 428 if err != nil { 429 t.Fatal(sub.err) 430 } 431 if !bytes.Equal(msg, got.Data) { 432 t.Fatal("got wrong message!") 433 } 434 } 435 436 for _, sub := range xmsgs { 437 got, err := sub.Next(ctx) 438 if err != nil { 439 t.Fatal(sub.err) 440 } 441 if !bytes.Equal(msg, got.Data) { 442 t.Fatal("got wrong message!") 443 } 444 } 445 446 // wait a bit to have some gossip interleaved 447 time.Sleep(time.Millisecond * 100) 448 } 449 450 // and wait for some gossip flushing 451 time.Sleep(time.Second * 2) 452} 453 454func TestGossipsubGossipPropagation(t *testing.T) { 455 ctx, cancel := context.WithCancel(context.Background()) 456 defer cancel() 457 458 hosts := getNetHosts(t, ctx, 20) 459 psubs := getGossipsubs(ctx, hosts) 460 461 hosts1 := hosts[:GossipSubD+1] 462 hosts2 := append(hosts[GossipSubD+1:], hosts[0]) 463 464 denseConnect(t, hosts1) 465 denseConnect(t, hosts2) 466 467 var msgs1 []*Subscription 468 for _, ps := range psubs[1 : GossipSubD+1] { 469 subch, err := ps.Subscribe("foobar") 470 if err != nil { 471 t.Fatal(err) 472 } 473 474 msgs1 = append(msgs1, subch) 475 } 476 477 time.Sleep(time.Second * 1) 478 479 for i := 0; i < 10; i++ { 480 msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) 481 482 owner := 0 483 484 psubs[owner].Publish("foobar", msg) 485 486 for _, sub := range msgs1 { 487 got, err := sub.Next(ctx) 488 if err != nil { 489 t.Fatal(sub.err) 490 } 491 if !bytes.Equal(msg, got.Data) { 492 t.Fatal("got wrong message!") 493 } 494 } 495 } 496 497 time.Sleep(time.Millisecond * 100) 498 499 var msgs2 []*Subscription 500 for _, ps := range psubs[GossipSubD+1:] { 501 subch, err := ps.Subscribe("foobar") 502 if err != nil { 503 t.Fatal(err) 504 } 505 506 msgs2 = append(msgs2, subch) 507 } 508 509 var collect [][]byte 510 for i := 0; i < 10; i++ { 511 for _, sub := range msgs2 { 512 got, err := sub.Next(ctx) 513 if err != nil { 514 t.Fatal(sub.err) 515 } 516 collect = append(collect, got.Data) 517 } 518 } 519 520 for i := 0; i < 10; i++ { 521 msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) 522 gotit := false 523 for j := 0; j < len(collect); j++ { 524 if bytes.Equal(msg, collect[j]) { 525 gotit = true 526 break 527 } 528 } 529 if !gotit { 530 t.Fatalf("Didn't get message %s", string(msg)) 531 } 532 } 533} 534 535func TestGossipsubPrune(t *testing.T) { 536 ctx, cancel := context.WithCancel(context.Background()) 537 defer cancel() 538 hosts := getNetHosts(t, ctx, 20) 539 540 psubs := getGossipsubs(ctx, hosts) 541 542 var msgs []*Subscription 543 for _, ps := range psubs { 544 subch, err := ps.Subscribe("foobar") 545 if err != nil { 546 t.Fatal(err) 547 } 548 549 msgs = append(msgs, subch) 550 } 551 552 denseConnect(t, hosts) 553 554 // wait for heartbeats to build mesh 555 time.Sleep(time.Second * 2) 556 557 // disconnect some peers from the mesh to get some PRUNEs 558 for _, sub := range msgs[:5] { 559 sub.Cancel() 560 } 561 562 // wait a bit to take effect 563 time.Sleep(time.Millisecond * 100) 564 565 for i := 0; i < 10; i++ { 566 msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) 567 568 owner := rand.Intn(len(psubs)) 569 570 psubs[owner].Publish("foobar", msg) 571 572 for _, sub := range msgs[5:] { 573 got, err := sub.Next(ctx) 574 if err != nil { 575 t.Fatal(sub.err) 576 } 577 if !bytes.Equal(msg, got.Data) { 578 t.Fatal("got wrong message!") 579 } 580 } 581 } 582} 583 584func TestGossipsubGraft(t *testing.T) { 585 ctx, cancel := context.WithCancel(context.Background()) 586 defer cancel() 587 hosts := getNetHosts(t, ctx, 20) 588 589 psubs := getGossipsubs(ctx, hosts) 590 591 sparseConnect(t, hosts) 592 593 time.Sleep(time.Second * 1) 594 595 var msgs []*Subscription 596 for _, ps := range psubs { 597 subch, err := ps.Subscribe("foobar") 598 if err != nil { 599 t.Fatal(err) 600 } 601 602 msgs = append(msgs, subch) 603 604 // wait for announce to propagate 605 time.Sleep(time.Millisecond * 100) 606 } 607 608 time.Sleep(time.Second * 1) 609 610 for i := 0; i < 100; i++ { 611 msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) 612 613 owner := rand.Intn(len(psubs)) 614 615 psubs[owner].Publish("foobar", msg) 616 617 for _, sub := range msgs { 618 got, err := sub.Next(ctx) 619 if err != nil { 620 t.Fatal(sub.err) 621 } 622 if !bytes.Equal(msg, got.Data) { 623 t.Fatal("got wrong message!") 624 } 625 } 626 } 627} 628 629func TestGossipsubRemovePeer(t *testing.T) { 630 ctx, cancel := context.WithCancel(context.Background()) 631 defer cancel() 632 hosts := getNetHosts(t, ctx, 20) 633 634 psubs := getGossipsubs(ctx, hosts) 635 636 var msgs []*Subscription 637 for _, ps := range psubs { 638 subch, err := ps.Subscribe("foobar") 639 if err != nil { 640 t.Fatal(err) 641 } 642 643 msgs = append(msgs, subch) 644 } 645 646 denseConnect(t, hosts) 647 648 // wait for heartbeats to build mesh 649 time.Sleep(time.Second * 2) 650 651 // disconnect some peers to exercise RemovePeer paths 652 for _, host := range hosts[:5] { 653 host.Close() 654 } 655 656 // wait a heartbeat 657 time.Sleep(time.Second * 1) 658 659 for i := 0; i < 10; i++ { 660 msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) 661 662 owner := 5 + rand.Intn(len(psubs)-5) 663 664 psubs[owner].Publish("foobar", msg) 665 666 for _, sub := range msgs[5:] { 667 got, err := sub.Next(ctx) 668 if err != nil { 669 t.Fatal(sub.err) 670 } 671 if !bytes.Equal(msg, got.Data) { 672 t.Fatal("got wrong message!") 673 } 674 } 675 } 676} 677 678func TestGossipsubGraftPruneRetry(t *testing.T) { 679 ctx, cancel := context.WithCancel(context.Background()) 680 defer cancel() 681 682 hosts := getNetHosts(t, ctx, 10) 683 psubs := getGossipsubs(ctx, hosts) 684 denseConnect(t, hosts) 685 686 var topics []string 687 var msgs [][]*Subscription 688 for i := 0; i < 35; i++ { 689 topic := fmt.Sprintf("topic%d", i) 690 topics = append(topics, topic) 691 692 var subs []*Subscription 693 for _, ps := range psubs { 694 subch, err := ps.Subscribe(topic) 695 if err != nil { 696 t.Fatal(err) 697 } 698 699 subs = append(subs, subch) 700 } 701 msgs = append(msgs, subs) 702 } 703 704 // wait for heartbeats to build meshes 705 time.Sleep(time.Second * 5) 706 707 for i, topic := range topics { 708 msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) 709 710 owner := rand.Intn(len(psubs)) 711 712 psubs[owner].Publish(topic, msg) 713 714 for _, sub := range msgs[i] { 715 got, err := sub.Next(ctx) 716 if err != nil { 717 t.Fatal(sub.err) 718 } 719 if !bytes.Equal(msg, got.Data) { 720 t.Fatal("got wrong message!") 721 } 722 } 723 } 724} 725 726func TestGossipsubControlPiggyback(t *testing.T) { 727 t.Skip("travis regularly fails on this test") 728 729 ctx, cancel := context.WithCancel(context.Background()) 730 defer cancel() 731 732 hosts := getNetHosts(t, ctx, 10) 733 psubs := getGossipsubs(ctx, hosts) 734 denseConnect(t, hosts) 735 736 for _, ps := range psubs { 737 subch, err := ps.Subscribe("flood") 738 if err != nil { 739 t.Fatal(err) 740 } 741 go func(sub *Subscription) { 742 for { 743 _, err := sub.Next(ctx) 744 if err != nil { 745 break 746 } 747 } 748 }(subch) 749 } 750 751 time.Sleep(time.Second * 1) 752 753 // create a background flood of messages that overloads the queues 754 done := make(chan struct{}) 755 go func() { 756 owner := rand.Intn(len(psubs)) 757 for i := 0; i < 10000; i++ { 758 msg := []byte("background flooooood") 759 psubs[owner].Publish("flood", msg) 760 } 761 done <- struct{}{} 762 }() 763 764 time.Sleep(time.Millisecond * 20) 765 766 // and subscribe to a bunch of topics in the meantime -- this should 767 // result in some dropped control messages, with subsequent piggybacking 768 // in the background flood 769 var topics []string 770 var msgs [][]*Subscription 771 for i := 0; i < 5; i++ { 772 topic := fmt.Sprintf("topic%d", i) 773 topics = append(topics, topic) 774 775 var subs []*Subscription 776 for _, ps := range psubs { 777 subch, err := ps.Subscribe(topic) 778 if err != nil { 779 t.Fatal(err) 780 } 781 782 subs = append(subs, subch) 783 } 784 msgs = append(msgs, subs) 785 } 786 787 // wait for the flood to stop 788 <-done 789 790 // and test that we have functional overlays 791 for i, topic := range topics { 792 msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) 793 794 owner := rand.Intn(len(psubs)) 795 796 psubs[owner].Publish(topic, msg) 797 798 for _, sub := range msgs[i] { 799 got, err := sub.Next(ctx) 800 if err != nil { 801 t.Fatal(sub.err) 802 } 803 if !bytes.Equal(msg, got.Data) { 804 t.Fatal("got wrong message!") 805 } 806 } 807 } 808} 809 810func TestMixedGossipsub(t *testing.T) { 811 ctx, cancel := context.WithCancel(context.Background()) 812 defer cancel() 813 hosts := getNetHosts(t, ctx, 30) 814 815 gsubs := getGossipsubs(ctx, hosts[:20]) 816 fsubs := getPubsubs(ctx, hosts[20:]) 817 psubs := append(gsubs, fsubs...) 818 819 var msgs []*Subscription 820 for _, ps := range psubs { 821 subch, err := ps.Subscribe("foobar") 822 if err != nil { 823 t.Fatal(err) 824 } 825 826 msgs = append(msgs, subch) 827 } 828 829 sparseConnect(t, hosts) 830 831 // wait for heartbeats to build mesh 832 time.Sleep(time.Second * 2) 833 834 for i := 0; i < 100; i++ { 835 msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) 836 837 owner := rand.Intn(len(psubs)) 838 839 psubs[owner].Publish("foobar", msg) 840 841 for _, sub := range msgs { 842 got, err := sub.Next(ctx) 843 if err != nil { 844 t.Fatal(sub.err) 845 } 846 if !bytes.Equal(msg, got.Data) { 847 t.Fatal("got wrong message!") 848 } 849 } 850 } 851} 852 853func TestGossipsubMultihops(t *testing.T) { 854 ctx, cancel := context.WithCancel(context.Background()) 855 defer cancel() 856 857 hosts := getNetHosts(t, ctx, 6) 858 859 psubs := getGossipsubs(ctx, hosts) 860 861 connect(t, hosts[0], hosts[1]) 862 connect(t, hosts[1], hosts[2]) 863 connect(t, hosts[2], hosts[3]) 864 connect(t, hosts[3], hosts[4]) 865 connect(t, hosts[4], hosts[5]) 866 867 var subs []*Subscription 868 for i := 1; i < 6; i++ { 869 ch, err := psubs[i].Subscribe("foobar") 870 if err != nil { 871 t.Fatal(err) 872 } 873 subs = append(subs, ch) 874 } 875 876 // wait for heartbeats to build mesh 877 time.Sleep(time.Second * 2) 878 879 msg := []byte("i like cats") 880 err := psubs[0].Publish("foobar", msg) 881 if err != nil { 882 t.Fatal(err) 883 } 884 885 // last node in the chain should get the message 886 select { 887 case out := <-subs[4].ch: 888 if !bytes.Equal(out.GetData(), msg) { 889 t.Fatal("got wrong data") 890 } 891 case <-time.After(time.Second * 5): 892 t.Fatal("timed out waiting for message") 893 } 894} 895 896func TestGossipsubTreeTopology(t *testing.T) { 897 ctx, cancel := context.WithCancel(context.Background()) 898 defer cancel() 899 900 hosts := getNetHosts(t, ctx, 10) 901 psubs := getGossipsubs(ctx, hosts) 902 903 connect(t, hosts[0], hosts[1]) 904 connect(t, hosts[1], hosts[2]) 905 connect(t, hosts[1], hosts[4]) 906 connect(t, hosts[2], hosts[3]) 907 connect(t, hosts[0], hosts[5]) 908 connect(t, hosts[5], hosts[6]) 909 connect(t, hosts[5], hosts[8]) 910 connect(t, hosts[6], hosts[7]) 911 connect(t, hosts[8], hosts[9]) 912 913 /* 914 [0] -> [1] -> [2] -> [3] 915 | L->[4] 916 v 917 [5] -> [6] -> [7] 918 | 919 v 920 [8] -> [9] 921 */ 922 923 var chs []*Subscription 924 for _, ps := range psubs { 925 ch, err := ps.Subscribe("fizzbuzz") 926 if err != nil { 927 t.Fatal(err) 928 } 929 930 chs = append(chs, ch) 931 } 932 933 // wait for heartbeats to build mesh 934 time.Sleep(time.Second * 2) 935 936 assertPeerLists(t, hosts, psubs[0], 1, 5) 937 assertPeerLists(t, hosts, psubs[1], 0, 2, 4) 938 assertPeerLists(t, hosts, psubs[2], 1, 3) 939 940 checkMessageRouting(t, "fizzbuzz", []*PubSub{psubs[9], psubs[3]}, chs) 941} 942 943// this tests overlay bootstrapping through px in Gossipsub v1.1 944// we start with a star topology and rely on px through prune to build the mesh 945func TestGossipsubStarTopology(t *testing.T) { 946 originalGossipSubD := GossipSubD 947 GossipSubD = 4 948 originalGossipSubDhi := GossipSubDhi 949 GossipSubDhi = GossipSubD + 1 950 originalGossipSubDlo := GossipSubDlo 951 GossipSubDlo = GossipSubD - 1 952 originalGossipSubDscore := GossipSubDscore 953 GossipSubDscore = GossipSubDlo 954 defer func() { 955 GossipSubD = originalGossipSubD 956 GossipSubDhi = originalGossipSubDhi 957 GossipSubDlo = originalGossipSubDlo 958 GossipSubDscore = originalGossipSubDscore 959 }() 960 961 ctx, cancel := context.WithCancel(context.Background()) 962 defer cancel() 963 964 hosts := getNetHosts(t, ctx, 20) 965 psubs := getGossipsubs(ctx, hosts, WithPeerExchange(true), WithFloodPublish(true)) 966 967 // configure the center of the star with a very low D 968 psubs[0].eval <- func() { 969 gs := psubs[0].rt.(*GossipSubRouter) 970 gs.D = 0 971 gs.Dlo = 0 972 gs.Dhi = 0 973 gs.Dscore = 0 974 } 975 976 // add all peer addresses to the peerstores 977 // this is necessary because we can't have signed address records witout identify 978 // pushing them 979 for i := range hosts { 980 for j := range hosts { 981 if i == j { 982 continue 983 } 984 hosts[i].Peerstore().AddAddrs(hosts[j].ID(), hosts[j].Addrs(), peerstore.PermanentAddrTTL) 985 } 986 } 987 988 // build the star 989 for i := 1; i < 20; i++ { 990 connect(t, hosts[0], hosts[i]) 991 } 992 993 time.Sleep(time.Second) 994 995 // build the mesh 996 var subs []*Subscription 997 for _, ps := range psubs { 998 sub, err := ps.Subscribe("test") 999 if err != nil { 1000 t.Fatal(err) 1001 } 1002 subs = append(subs, sub) 1003 } 1004 1005 // wait a bit for the mesh to build 1006 time.Sleep(10 * time.Second) 1007 1008 // check that all peers have > 1 connection 1009 for i, h := range hosts { 1010 if len(h.Network().Conns()) == 1 { 1011 t.Errorf("peer %d has ony a single connection", i) 1012 } 1013 } 1014 1015 // send a message from each peer and assert it was propagated 1016 for i := 0; i < 20; i++ { 1017 msg := []byte(fmt.Sprintf("message %d", i)) 1018 psubs[i].Publish("test", msg) 1019 1020 for _, sub := range subs { 1021 assertReceive(t, sub, msg) 1022 } 1023 } 1024} 1025 1026// this tests overlay bootstrapping through px in Gossipsub v1.1, with addresses 1027// exchanged in signed peer records. 1028// we start with a star topology and rely on px through prune to build the mesh 1029func TestGossipsubStarTopologyWithSignedPeerRecords(t *testing.T) { 1030 originalGossipSubD := GossipSubD 1031 GossipSubD = 4 1032 originalGossipSubDhi := GossipSubDhi 1033 GossipSubDhi = GossipSubD + 1 1034 originalGossipSubDlo := GossipSubDlo 1035 GossipSubDlo = GossipSubD - 1 1036 originalGossipSubDscore := GossipSubDscore 1037 GossipSubDscore = GossipSubDlo 1038 defer func() { 1039 GossipSubD = originalGossipSubD 1040 GossipSubDhi = originalGossipSubDhi 1041 GossipSubDlo = originalGossipSubDlo 1042 GossipSubDscore = originalGossipSubDscore 1043 }() 1044 1045 ctx, cancel := context.WithCancel(context.Background()) 1046 defer cancel() 1047 1048 hosts := getNetHosts(t, ctx, 20) 1049 psubs := getGossipsubs(ctx, hosts, WithPeerExchange(true), WithFloodPublish(true)) 1050 1051 // configure the center of the star with a very low D 1052 psubs[0].eval <- func() { 1053 gs := psubs[0].rt.(*GossipSubRouter) 1054 gs.D = 0 1055 gs.Dlo = 0 1056 gs.Dhi = 0 1057 gs.Dscore = 0 1058 } 1059 1060 // manually create signed peer records for each host and add them to the 1061 // peerstore of the center of the star, which is doing the bootstrapping 1062 for i := range hosts[1:] { 1063 privKey := hosts[i].Peerstore().PrivKey(hosts[i].ID()) 1064 if privKey == nil { 1065 t.Fatalf("unable to get private key for host %s", hosts[i].ID().Pretty()) 1066 } 1067 ai := host.InfoFromHost(hosts[i]) 1068 rec := peer.PeerRecordFromAddrInfo(*ai) 1069 signedRec, err := record.Seal(rec, privKey) 1070 if err != nil { 1071 t.Fatalf("error creating signed peer record: %s", err) 1072 } 1073 1074 cab, ok := peerstore.GetCertifiedAddrBook(hosts[0].Peerstore()) 1075 if !ok { 1076 t.Fatal("peerstore does not implement CertifiedAddrBook") 1077 } 1078 _, err = cab.ConsumePeerRecord(signedRec, peerstore.PermanentAddrTTL) 1079 if err != nil { 1080 t.Fatalf("error adding signed peer record: %s", err) 1081 } 1082 } 1083 1084 // build the star 1085 for i := 1; i < 20; i++ { 1086 connect(t, hosts[0], hosts[i]) 1087 } 1088 1089 time.Sleep(time.Second) 1090 1091 // build the mesh 1092 var subs []*Subscription 1093 for _, ps := range psubs { 1094 sub, err := ps.Subscribe("test") 1095 if err != nil { 1096 t.Fatal(err) 1097 } 1098 subs = append(subs, sub) 1099 } 1100 1101 // wait a bit for the mesh to build 1102 time.Sleep(10 * time.Second) 1103 1104 // check that all peers have > 1 connection 1105 for i, h := range hosts { 1106 if len(h.Network().Conns()) == 1 { 1107 t.Errorf("peer %d has ony a single connection", i) 1108 } 1109 } 1110 1111 // send a message from each peer and assert it was propagated 1112 for i := 0; i < 20; i++ { 1113 msg := []byte(fmt.Sprintf("message %d", i)) 1114 psubs[i].Publish("test", msg) 1115 1116 for _, sub := range subs { 1117 assertReceive(t, sub, msg) 1118 } 1119 } 1120} 1121 1122func TestGossipsubDirectPeers(t *testing.T) { 1123 ctx, cancel := context.WithCancel(context.Background()) 1124 defer cancel() 1125 1126 h := getNetHosts(t, ctx, 3) 1127 psubs := []*PubSub{ 1128 getGossipsub(ctx, h[0], WithDirectConnectTicks(2)), 1129 getGossipsub(ctx, h[1], WithDirectPeers([]peer.AddrInfo{peer.AddrInfo{h[2].ID(), h[2].Addrs()}}), WithDirectConnectTicks(2)), 1130 getGossipsub(ctx, h[2], WithDirectPeers([]peer.AddrInfo{peer.AddrInfo{h[1].ID(), h[1].Addrs()}}), WithDirectConnectTicks(2)), 1131 } 1132 1133 connect(t, h[0], h[1]) 1134 connect(t, h[0], h[2]) 1135 1136 // verify that the direct peers connected 1137 time.Sleep(2 * time.Second) 1138 if len(h[1].Network().ConnsToPeer(h[2].ID())) == 0 { 1139 t.Fatal("expected a connection between direct peers") 1140 } 1141 1142 // build the mesh 1143 var subs []*Subscription 1144 for _, ps := range psubs { 1145 sub, err := ps.Subscribe("test") 1146 if err != nil { 1147 t.Fatal(err) 1148 } 1149 subs = append(subs, sub) 1150 } 1151 1152 time.Sleep(time.Second) 1153 1154 // publish some messages 1155 for i := 0; i < 3; i++ { 1156 msg := []byte(fmt.Sprintf("message %d", i)) 1157 psubs[i].Publish("test", msg) 1158 1159 for _, sub := range subs { 1160 assertReceive(t, sub, msg) 1161 } 1162 } 1163 1164 // disconnect the direct peers to test reconnection 1165 for _, c := range h[1].Network().ConnsToPeer(h[2].ID()) { 1166 c.Close() 1167 } 1168 1169 time.Sleep(5 * time.Second) 1170 1171 if len(h[1].Network().ConnsToPeer(h[2].ID())) == 0 { 1172 t.Fatal("expected a connection between direct peers") 1173 } 1174 1175 // publish some messages 1176 for i := 0; i < 3; i++ { 1177 msg := []byte(fmt.Sprintf("message %d", i)) 1178 psubs[i].Publish("test", msg) 1179 1180 for _, sub := range subs { 1181 assertReceive(t, sub, msg) 1182 } 1183 } 1184} 1185 1186func TestGossipsubDirectPeersFanout(t *testing.T) { 1187 // regression test for #371 1188 ctx, cancel := context.WithCancel(context.Background()) 1189 defer cancel() 1190 1191 h := getNetHosts(t, ctx, 3) 1192 psubs := []*PubSub{ 1193 getGossipsub(ctx, h[0]), 1194 getGossipsub(ctx, h[1], WithDirectPeers([]peer.AddrInfo{peer.AddrInfo{h[2].ID(), h[2].Addrs()}})), 1195 getGossipsub(ctx, h[2], WithDirectPeers([]peer.AddrInfo{peer.AddrInfo{h[1].ID(), h[1].Addrs()}})), 1196 } 1197 1198 connect(t, h[0], h[1]) 1199 connect(t, h[0], h[2]) 1200 1201 // Join all peers except h2 1202 var subs []*Subscription 1203 for _, ps := range psubs[:2] { 1204 sub, err := ps.Subscribe("test") 1205 if err != nil { 1206 t.Fatal(err) 1207 } 1208 subs = append(subs, sub) 1209 } 1210 1211 time.Sleep(time.Second) 1212 1213 // h2 publishes some messages to build a fanout 1214 for i := 0; i < 3; i++ { 1215 msg := []byte(fmt.Sprintf("message %d", i)) 1216 psubs[2].Publish("test", msg) 1217 1218 for _, sub := range subs { 1219 assertReceive(t, sub, msg) 1220 } 1221 } 1222 1223 // verify that h0 is in the fanout of h2, but not h1 who is a direct peer 1224 result := make(chan bool, 2) 1225 psubs[2].eval <- func() { 1226 rt := psubs[2].rt.(*GossipSubRouter) 1227 fanout := rt.fanout["test"] 1228 _, ok := fanout[h[0].ID()] 1229 result <- ok 1230 _, ok = fanout[h[1].ID()] 1231 result <- ok 1232 } 1233 1234 inFanout := <-result 1235 if !inFanout { 1236 t.Fatal("expected peer 0 to be in fanout") 1237 } 1238 1239 inFanout = <-result 1240 if inFanout { 1241 t.Fatal("expected peer 1 to not be in fanout") 1242 } 1243 1244 // now subscribe h2 too and verify tht h0 is in the mesh but not h1 1245 _, err := psubs[2].Subscribe("test") 1246 if err != nil { 1247 t.Fatal(err) 1248 } 1249 1250 time.Sleep(2 * time.Second) 1251 1252 psubs[2].eval <- func() { 1253 rt := psubs[2].rt.(*GossipSubRouter) 1254 mesh := rt.mesh["test"] 1255 _, ok := mesh[h[0].ID()] 1256 result <- ok 1257 _, ok = mesh[h[1].ID()] 1258 result <- ok 1259 } 1260 1261 inMesh := <-result 1262 if !inMesh { 1263 t.Fatal("expected peer 0 to be in mesh") 1264 } 1265 1266 inMesh = <-result 1267 if inMesh { 1268 t.Fatal("expected peer 1 to not be in mesh") 1269 } 1270} 1271 1272func TestGossipsubFloodPublish(t *testing.T) { 1273 // uses a star topology without PX and publishes from the star to verify that all 1274 // messages get received 1275 ctx, cancel := context.WithCancel(context.Background()) 1276 defer cancel() 1277 1278 hosts := getNetHosts(t, ctx, 20) 1279 psubs := getGossipsubs(ctx, hosts, WithFloodPublish(true)) 1280 1281 // build the star 1282 for i := 1; i < 20; i++ { 1283 connect(t, hosts[0], hosts[i]) 1284 } 1285 1286 // build the (partial, unstable) mesh 1287 var subs []*Subscription 1288 for _, ps := range psubs { 1289 sub, err := ps.Subscribe("test") 1290 if err != nil { 1291 t.Fatal(err) 1292 } 1293 subs = append(subs, sub) 1294 } 1295 1296 time.Sleep(time.Second) 1297 1298 // send a message from the star and assert it was received 1299 for i := 0; i < 20; i++ { 1300 msg := []byte(fmt.Sprintf("message %d", i)) 1301 psubs[0].Publish("test", msg) 1302 1303 for _, sub := range subs { 1304 assertReceive(t, sub, msg) 1305 } 1306 } 1307} 1308 1309func TestGossipsubEnoughPeers(t *testing.T) { 1310 ctx, cancel := context.WithCancel(context.Background()) 1311 defer cancel() 1312 1313 hosts := getNetHosts(t, ctx, 20) 1314 psubs := getGossipsubs(ctx, hosts) 1315 1316 var subs []*Subscription 1317 for _, ps := range psubs { 1318 sub, err := ps.Subscribe("test") 1319 if err != nil { 1320 t.Fatal(err) 1321 } 1322 subs = append(subs, sub) 1323 } 1324 1325 // at this point we have no connections and no mesh, so EnoughPeers should return false 1326 res := make(chan bool, 1) 1327 psubs[0].eval <- func() { 1328 res <- psubs[0].rt.EnoughPeers("test", 0) 1329 } 1330 enough := <-res 1331 if enough { 1332 t.Fatal("should not have enough peers") 1333 } 1334 1335 // connect them densly to build up the mesh 1336 denseConnect(t, hosts) 1337 1338 time.Sleep(3 * time.Second) 1339 1340 psubs[0].eval <- func() { 1341 res <- psubs[0].rt.EnoughPeers("test", 0) 1342 } 1343 enough = <-res 1344 if !enough { 1345 t.Fatal("should have enough peers") 1346 } 1347} 1348 1349func TestGossipsubNegativeScore(t *testing.T) { 1350 // in this test we score sinkhole a peer to exercise code paths relative to negative scores 1351 ctx, cancel := context.WithCancel(context.Background()) 1352 defer cancel() 1353 1354 hosts := getNetHosts(t, ctx, 20) 1355 psubs := getGossipsubs(ctx, hosts, 1356 WithPeerScore( 1357 &PeerScoreParams{ 1358 AppSpecificScore: func(p peer.ID) float64 { 1359 if p == hosts[0].ID() { 1360 return -1000 1361 } else { 1362 return 0 1363 } 1364 }, 1365 AppSpecificWeight: 1, 1366 DecayInterval: time.Second, 1367 DecayToZero: 0.01, 1368 }, 1369 &PeerScoreThresholds{ 1370 GossipThreshold: -10, 1371 PublishThreshold: -100, 1372 GraylistThreshold: -10000, 1373 })) 1374 1375 denseConnect(t, hosts) 1376 1377 var subs []*Subscription 1378 for _, ps := range psubs { 1379 sub, err := ps.Subscribe("test") 1380 if err != nil { 1381 t.Fatal(err) 1382 } 1383 subs = append(subs, sub) 1384 } 1385 1386 time.Sleep(3 * time.Second) 1387 1388 for i := 0; i < 20; i++ { 1389 msg := []byte(fmt.Sprintf("message %d", i)) 1390 psubs[i%20].Publish("test", msg) 1391 time.Sleep(20 * time.Millisecond) 1392 } 1393 1394 // let the sinkholed peer try to emit gossip as well 1395 time.Sleep(2 * time.Second) 1396 1397 // checks: 1398 // 1. peer 0 should only receive its own message 1399 // 2. peers 1-20 should not receive a message from peer 0, because it's not part of the mesh 1400 // and its gossip is rejected 1401 collectAll := func(sub *Subscription) []*Message { 1402 var res []*Message 1403 ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) 1404 defer cancel() 1405 1406 for { 1407 msg, err := sub.Next(ctx) 1408 if err != nil { 1409 break 1410 } 1411 1412 res = append(res, msg) 1413 } 1414 1415 return res 1416 } 1417 1418 count := len(collectAll(subs[0])) 1419 if count != 1 { 1420 t.Fatalf("expected 1 message but got %d instead", count) 1421 } 1422 1423 for _, sub := range subs[1:] { 1424 all := collectAll(sub) 1425 for _, m := range all { 1426 if m.ReceivedFrom == hosts[0].ID() { 1427 t.Fatal("received message from sinkholed peer") 1428 } 1429 } 1430 } 1431} 1432 1433func TestGossipsubScoreValidatorEx(t *testing.T) { 1434 // this is a test that of the two message drop responses from a validator 1435 ctx, cancel := context.WithCancel(context.Background()) 1436 defer cancel() 1437 1438 hosts := getNetHosts(t, ctx, 3) 1439 psubs := getGossipsubs(ctx, hosts, 1440 WithPeerScore( 1441 &PeerScoreParams{ 1442 AppSpecificScore: func(p peer.ID) float64 { return 0 }, 1443 DecayInterval: time.Second, 1444 DecayToZero: 0.01, 1445 Topics: map[string]*TopicScoreParams{ 1446 "test": &TopicScoreParams{ 1447 TopicWeight: 1, 1448 TimeInMeshQuantum: time.Second, 1449 InvalidMessageDeliveriesWeight: -1, 1450 InvalidMessageDeliveriesDecay: 0.9999, 1451 }, 1452 }, 1453 }, 1454 &PeerScoreThresholds{ 1455 GossipThreshold: -10, 1456 PublishThreshold: -100, 1457 GraylistThreshold: -10000, 1458 })) 1459 1460 connectAll(t, hosts) 1461 1462 err := psubs[0].RegisterTopicValidator("test", func(ctx context.Context, p peer.ID, msg *Message) ValidationResult { 1463 // we ignore host1 and reject host2 1464 if p == hosts[1].ID() { 1465 return ValidationIgnore 1466 } 1467 if p == hosts[2].ID() { 1468 return ValidationReject 1469 } 1470 1471 return ValidationAccept 1472 }) 1473 if err != nil { 1474 t.Fatal(err) 1475 } 1476 1477 sub, err := psubs[0].Subscribe("test") 1478 if err != nil { 1479 t.Fatal(err) 1480 } 1481 1482 time.Sleep(100 * time.Millisecond) 1483 1484 expectNoMessage := func(sub *Subscription) { 1485 ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) 1486 defer cancel() 1487 1488 m, err := sub.Next(ctx) 1489 if err == nil { 1490 t.Fatal("expected no message, but got ", string(m.Data)) 1491 } 1492 } 1493 1494 psubs[1].Publish("test", []byte("i am not a walrus")) 1495 psubs[2].Publish("test", []byte("i am not a walrus either")) 1496 1497 // assert no messages 1498 expectNoMessage(sub) 1499 1500 // assert that peer1's score is still 0 (its message was ignored) while peer2 should have 1501 // a negative score (its message got rejected) 1502 res := make(chan float64, 1) 1503 psubs[0].eval <- func() { 1504 res <- psubs[0].rt.(*GossipSubRouter).score.Score(hosts[1].ID()) 1505 } 1506 score := <-res 1507 if score != 0 { 1508 t.Fatalf("expected 0 score for peer1, but got %f", score) 1509 } 1510 1511 psubs[0].eval <- func() { 1512 res <- psubs[0].rt.(*GossipSubRouter).score.Score(hosts[2].ID()) 1513 } 1514 score = <-res 1515 if score >= 0 { 1516 t.Fatalf("expected negative score for peer2, but got %f", score) 1517 } 1518} 1519 1520func TestGossipsubPiggybackControl(t *testing.T) { 1521 // this is a direct test of the piggybackControl function as we can't reliably 1522 // trigger it on travis 1523 ctx, cancel := context.WithCancel(context.Background()) 1524 defer cancel() 1525 1526 h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx)) 1527 ps := getGossipsub(ctx, h) 1528 1529 blah := peer.ID("bogotr0n") 1530 1531 res := make(chan *RPC, 1) 1532 ps.eval <- func() { 1533 gs := ps.rt.(*GossipSubRouter) 1534 test1 := "test1" 1535 test2 := "test2" 1536 test3 := "test3" 1537 gs.mesh[test1] = make(map[peer.ID]struct{}) 1538 gs.mesh[test2] = make(map[peer.ID]struct{}) 1539 gs.mesh[test1][blah] = struct{}{} 1540 1541 rpc := &RPC{RPC: pb.RPC{}} 1542 gs.piggybackControl(blah, rpc, &pb.ControlMessage{ 1543 Graft: []*pb.ControlGraft{&pb.ControlGraft{TopicID: &test1}, &pb.ControlGraft{TopicID: &test2}, &pb.ControlGraft{TopicID: &test3}}, 1544 Prune: []*pb.ControlPrune{&pb.ControlPrune{TopicID: &test1}, &pb.ControlPrune{TopicID: &test2}, &pb.ControlPrune{TopicID: &test3}}, 1545 }) 1546 res <- rpc 1547 } 1548 1549 rpc := <-res 1550 if rpc.Control == nil { 1551 t.Fatal("expected non-nil control message") 1552 } 1553 if len(rpc.Control.Graft) != 1 { 1554 t.Fatal("expected 1 GRAFT") 1555 } 1556 if rpc.Control.Graft[0].GetTopicID() != "test1" { 1557 t.Fatal("expected test1 as graft topic ID") 1558 } 1559 if len(rpc.Control.Prune) != 2 { 1560 t.Fatal("expected 2 PRUNEs") 1561 } 1562 if rpc.Control.Prune[0].GetTopicID() != "test2" { 1563 t.Fatal("expected test2 as prune topic ID") 1564 } 1565 if rpc.Control.Prune[1].GetTopicID() != "test3" { 1566 t.Fatal("expected test3 as prune topic ID") 1567 } 1568} 1569 1570func TestGossipsubMultipleGraftTopics(t *testing.T) { 1571 ctx, cancel := context.WithCancel(context.Background()) 1572 defer cancel() 1573 1574 hosts := getNetHosts(t, ctx, 2) 1575 psubs := getGossipsubs(ctx, hosts) 1576 sparseConnect(t, hosts) 1577 1578 time.Sleep(time.Second * 1) 1579 1580 firstTopic := "topic1" 1581 secondTopic := "topic2" 1582 thirdTopic := "topic3" 1583 1584 firstPeer := hosts[0].ID() 1585 secondPeer := hosts[1].ID() 1586 1587 p2Sub := psubs[1] 1588 p1Router := psubs[0].rt.(*GossipSubRouter) 1589 p2Router := psubs[1].rt.(*GossipSubRouter) 1590 1591 finChan := make(chan struct{}) 1592 1593 p2Sub.eval <- func() { 1594 // Add topics to second peer 1595 p2Router.mesh[firstTopic] = map[peer.ID]struct{}{} 1596 p2Router.mesh[secondTopic] = map[peer.ID]struct{}{} 1597 p2Router.mesh[thirdTopic] = map[peer.ID]struct{}{} 1598 1599 finChan <- struct{}{} 1600 } 1601 <-finChan 1602 1603 // Send multiple GRAFT messages to second peer from 1604 // 1st peer 1605 p1Router.sendGraftPrune(map[peer.ID][]string{ 1606 secondPeer: []string{firstTopic, secondTopic, thirdTopic}, 1607 }, map[peer.ID][]string{}, map[peer.ID]bool{}) 1608 1609 time.Sleep(time.Second * 1) 1610 1611 p2Sub.eval <- func() { 1612 if _, ok := p2Router.mesh[firstTopic][firstPeer]; !ok { 1613 t.Errorf("First peer wasnt added to mesh of the second peer for the topic %s", firstTopic) 1614 } 1615 if _, ok := p2Router.mesh[secondTopic][firstPeer]; !ok { 1616 t.Errorf("First peer wasnt added to mesh of the second peer for the topic %s", secondTopic) 1617 } 1618 if _, ok := p2Router.mesh[thirdTopic][firstPeer]; !ok { 1619 t.Errorf("First peer wasnt added to mesh of the second peer for the topic %s", thirdTopic) 1620 } 1621 finChan <- struct{}{} 1622 } 1623 <-finChan 1624} 1625 1626func TestGossipsubOpportunisticGrafting(t *testing.T) { 1627 originalGossipSubPruneBackoff := GossipSubPruneBackoff 1628 GossipSubPruneBackoff = 500 * time.Millisecond 1629 originalGossipSubGraftFloodThreshold := GossipSubGraftFloodThreshold 1630 GossipSubGraftFloodThreshold = 100 * time.Millisecond 1631 originalGossipSubOpportunisticGraftTicks := GossipSubOpportunisticGraftTicks 1632 GossipSubOpportunisticGraftTicks = 2 1633 defer func() { 1634 GossipSubPruneBackoff = originalGossipSubPruneBackoff 1635 GossipSubGraftFloodThreshold = originalGossipSubGraftFloodThreshold 1636 GossipSubOpportunisticGraftTicks = originalGossipSubOpportunisticGraftTicks 1637 }() 1638 1639 ctx, cancel := context.WithCancel(context.Background()) 1640 defer cancel() 1641 1642 hosts := getNetHosts(t, ctx, 50) 1643 // pubsubs for the first 10 hosts 1644 psubs := getGossipsubs(ctx, hosts[:10], 1645 WithFloodPublish(true), 1646 WithPeerScore( 1647 &PeerScoreParams{ 1648 AppSpecificScore: func(peer.ID) float64 { return 0 }, 1649 AppSpecificWeight: 0, 1650 DecayInterval: time.Second, 1651 DecayToZero: 0.01, 1652 Topics: map[string]*TopicScoreParams{ 1653 "test": &TopicScoreParams{ 1654 TopicWeight: 1, 1655 TimeInMeshWeight: 0.0002777, 1656 TimeInMeshQuantum: time.Second, 1657 TimeInMeshCap: 3600, 1658 FirstMessageDeliveriesWeight: 1, 1659 FirstMessageDeliveriesDecay: 0.9997, 1660 FirstMessageDeliveriesCap: 100, 1661 InvalidMessageDeliveriesDecay: 0.99997, 1662 }, 1663 }, 1664 }, 1665 &PeerScoreThresholds{ 1666 GossipThreshold: -10, 1667 PublishThreshold: -100, 1668 GraylistThreshold: -10000, 1669 OpportunisticGraftThreshold: 1, 1670 })) 1671 1672 // connect the real hosts with degree 5 1673 connectSome(t, hosts[:10], 5) 1674 1675 // sybil squatters for the remaining 40 hosts 1676 squatters := make([]*sybilSquatter, 0, 40) 1677 for _, h := range hosts[10:] { 1678 squatter := &sybilSquatter{h: h} 1679 h.SetStreamHandler(GossipSubID_v10, squatter.handleStream) 1680 squatters = append(squatters, squatter) 1681 } 1682 1683 // connect all squatters to every real host 1684 for _, squatter := range hosts[10:] { 1685 for _, real := range hosts[:10] { 1686 connect(t, squatter, real) 1687 } 1688 } 1689 1690 // wait a bit for the connections to propagate events to the pubsubs 1691 time.Sleep(time.Second) 1692 1693 // ask the real pubsus to join the topic 1694 for _, ps := range psubs { 1695 sub, err := ps.Subscribe("test") 1696 if err != nil { 1697 t.Fatal(err) 1698 } 1699 // consume the messages 1700 go func(sub *Subscription) { 1701 for { 1702 _, err := sub.Next(ctx) 1703 if err != nil { 1704 return 1705 } 1706 } 1707 }(sub) 1708 } 1709 1710 // publish a bunch of messages from the real hosts 1711 for i := 0; i < 1000; i++ { 1712 msg := []byte(fmt.Sprintf("message %d", i)) 1713 psubs[i%10].Publish("test", msg) 1714 time.Sleep(20 * time.Millisecond) 1715 } 1716 1717 // now wait a few of oppgraft cycles 1718 time.Sleep(7 * time.Second) 1719 1720 // check the honest peer meshes, they should have at least 3 honest peers each 1721 res := make(chan int, 1) 1722 for _, ps := range psubs { 1723 ps.eval <- func() { 1724 gs := ps.rt.(*GossipSubRouter) 1725 count := 0 1726 for _, h := range hosts[:10] { 1727 _, ok := gs.mesh["test"][h.ID()] 1728 if ok { 1729 count++ 1730 } 1731 } 1732 res <- count 1733 } 1734 1735 count := <-res 1736 if count < 3 { 1737 t.Fatalf("expected at least 3 honest peers, got %d", count) 1738 } 1739 } 1740} 1741 1742type sybilSquatter struct { 1743 h host.Host 1744} 1745 1746func (sq *sybilSquatter) handleStream(s network.Stream) { 1747 defer s.Close() 1748 1749 os, err := sq.h.NewStream(context.Background(), s.Conn().RemotePeer(), GossipSubID_v10) 1750 if err != nil { 1751 panic(err) 1752 } 1753 1754 // send a subscription for test in the output stream to become candidate for GRAFT 1755 // and then just read and ignore the incoming RPCs 1756 r := protoio.NewDelimitedReader(s, 1<<20) 1757 w := protoio.NewDelimitedWriter(os) 1758 truth := true 1759 topic := "test" 1760 err = w.WriteMsg(&pb.RPC{Subscriptions: []*pb.RPC_SubOpts{&pb.RPC_SubOpts{Subscribe: &truth, Topicid: &topic}}}) 1761 if err != nil { 1762 panic(err) 1763 } 1764 1765 var rpc pb.RPC 1766 for { 1767 rpc.Reset() 1768 err = r.ReadMsg(&rpc) 1769 if err != nil { 1770 if err != io.EOF { 1771 s.Reset() 1772 } 1773 return 1774 } 1775 } 1776} 1777 1778func TestGossipsubPeerScoreInspect(t *testing.T) { 1779 // this test exercises the code path sof peer score inspection 1780 ctx, cancel := context.WithCancel(context.Background()) 1781 defer cancel() 1782 1783 hosts := getNetHosts(t, ctx, 2) 1784 1785 inspector := &mockPeerScoreInspector{} 1786 psub1 := getGossipsub(ctx, hosts[0], 1787 WithPeerScore( 1788 &PeerScoreParams{ 1789 Topics: map[string]*TopicScoreParams{ 1790 "test": &TopicScoreParams{ 1791 TopicWeight: 1, 1792 TimeInMeshQuantum: time.Second, 1793 FirstMessageDeliveriesWeight: 1, 1794 FirstMessageDeliveriesDecay: 0.999, 1795 FirstMessageDeliveriesCap: 100, 1796 InvalidMessageDeliveriesWeight: -1, 1797 InvalidMessageDeliveriesDecay: 0.9999, 1798 }, 1799 }, 1800 AppSpecificScore: func(peer.ID) float64 { return 0 }, 1801 DecayInterval: time.Second, 1802 DecayToZero: 0.01, 1803 }, 1804 &PeerScoreThresholds{ 1805 GossipThreshold: -1, 1806 PublishThreshold: -10, 1807 GraylistThreshold: -1000, 1808 }), 1809 WithPeerScoreInspect(inspector.inspect, time.Second)) 1810 psub2 := getGossipsub(ctx, hosts[1]) 1811 psubs := []*PubSub{psub1, psub2} 1812 1813 connect(t, hosts[0], hosts[1]) 1814 1815 var subs []*Subscription 1816 for _, ps := range psubs { 1817 sub, err := ps.Subscribe("test") 1818 if err != nil { 1819 t.Fatal(err) 1820 } 1821 subs = append(subs, sub) 1822 } 1823 1824 time.Sleep(time.Second) 1825 1826 for i := 0; i < 20; i++ { 1827 msg := []byte(fmt.Sprintf("message %d", i)) 1828 psubs[i%2].Publish("test", msg) 1829 time.Sleep(20 * time.Millisecond) 1830 } 1831 1832 time.Sleep(time.Second + 200*time.Millisecond) 1833 1834 score2 := inspector.score(hosts[1].ID()) 1835 if score2 < 9 { 1836 t.Fatalf("expected score to be at least 9, instead got %f", score2) 1837 } 1838} 1839 1840func TestGossipsubPeerScoreResetTopicParams(t *testing.T) { 1841 // this test exercises the code path sof peer score inspection 1842 ctx, cancel := context.WithCancel(context.Background()) 1843 defer cancel() 1844 1845 hosts := getNetHosts(t, ctx, 1) 1846 1847 ps := getGossipsub(ctx, hosts[0], 1848 WithPeerScore( 1849 &PeerScoreParams{ 1850 Topics: map[string]*TopicScoreParams{ 1851 "test": &TopicScoreParams{ 1852 TopicWeight: 1, 1853 TimeInMeshQuantum: time.Second, 1854 FirstMessageDeliveriesWeight: 1, 1855 FirstMessageDeliveriesDecay: 0.999, 1856 FirstMessageDeliveriesCap: 100, 1857 InvalidMessageDeliveriesWeight: -1, 1858 InvalidMessageDeliveriesDecay: 0.9999, 1859 }, 1860 }, 1861 AppSpecificScore: func(peer.ID) float64 { return 0 }, 1862 DecayInterval: time.Second, 1863 DecayToZero: 0.01, 1864 }, 1865 &PeerScoreThresholds{ 1866 GossipThreshold: -1, 1867 PublishThreshold: -10, 1868 GraylistThreshold: -1000, 1869 })) 1870 1871 topic, err := ps.Join("test") 1872 if err != nil { 1873 t.Fatal(err) 1874 } 1875 1876 err = topic.SetScoreParams( 1877 &TopicScoreParams{ 1878 TopicWeight: 1, 1879 TimeInMeshQuantum: time.Second, 1880 FirstMessageDeliveriesWeight: 1, 1881 FirstMessageDeliveriesDecay: 0.999, 1882 FirstMessageDeliveriesCap: 200, 1883 InvalidMessageDeliveriesWeight: -1, 1884 InvalidMessageDeliveriesDecay: 0.9999, 1885 }) 1886 if err != nil { 1887 t.Fatal(err) 1888 } 1889} 1890 1891type mockPeerScoreInspector struct { 1892 mx sync.Mutex 1893 scores map[peer.ID]float64 1894} 1895 1896func (ps *mockPeerScoreInspector) inspect(scores map[peer.ID]float64) { 1897 ps.mx.Lock() 1898 defer ps.mx.Unlock() 1899 ps.scores = scores 1900} 1901 1902func (ps *mockPeerScoreInspector) score(p peer.ID) float64 { 1903 ps.mx.Lock() 1904 defer ps.mx.Unlock() 1905 return ps.scores[p] 1906} 1907 1908func TestGossipsubRPCFragmentation(t *testing.T) { 1909 ctx, cancel := context.WithCancel(context.Background()) 1910 defer cancel() 1911 1912 hosts := getNetHosts(t, ctx, 2) 1913 ps := getGossipsub(ctx, hosts[0]) 1914 1915 // make a fake peer that requests everything through IWANT gossip 1916 iwe := iwantEverything{h: hosts[1]} 1917 iwe.h.SetStreamHandler(GossipSubID_v10, iwe.handleStream) 1918 1919 connect(t, hosts[0], hosts[1]) 1920 1921 // have the real pubsub join the test topic 1922 _, err := ps.Subscribe("test") 1923 if err != nil { 1924 t.Fatal(err) 1925 } 1926 1927 // wait for the real pubsub to connect and try to graft to the faker 1928 time.Sleep(time.Second) 1929 1930 // publish a bunch of fairly large messages from the real host 1931 nMessages := 1000 1932 msgSize := 20000 1933 for i := 0; i < nMessages; i++ { 1934 msg := make([]byte, msgSize) 1935 rand.Read(msg) 1936 ps.Publish("test", msg) 1937 time.Sleep(20 * time.Millisecond) 1938 } 1939 1940 // wait a bit for them to be received via gossip by the fake peer 1941 time.Sleep(5 * time.Second) 1942 iwe.lk.Lock() 1943 defer iwe.lk.Unlock() 1944 1945 // we should have received all the messages 1946 if iwe.msgsReceived != nMessages { 1947 t.Fatalf("expected fake gossipsub peer to receive all messages, got %d / %d", iwe.msgsReceived, nMessages) 1948 } 1949 1950 // and we should have seen an IHAVE message for each of them 1951 if iwe.ihavesReceived != nMessages { 1952 t.Fatalf("expected to get IHAVEs for every message, got %d / %d", iwe.ihavesReceived, nMessages) 1953 } 1954 1955 // If everything were fragmented with maximum efficiency, we would expect to get 1956 // (nMessages * msgSize) / ps.maxMessageSize total RPCs containing the messages we sent IWANTs for. 1957 // The actual number will probably be larger, since there's some overhead for the RPC itself, and 1958 // we probably aren't packing each RPC to it's maximum size 1959 minExpectedRPCS := (nMessages * msgSize) / ps.maxMessageSize 1960 if iwe.rpcsWithMessages < minExpectedRPCS { 1961 t.Fatalf("expected to receive at least %d RPCs containing messages, got %d", minExpectedRPCS, iwe.rpcsWithMessages) 1962 } 1963} 1964 1965// iwantEverything is a simple gossipsub client that never grafts onto a mesh, 1966// instead requesting everything through IWANT gossip messages. It is used to 1967// test that large responses to IWANT requests are fragmented into multiple RPCs. 1968type iwantEverything struct { 1969 h host.Host 1970 lk sync.Mutex 1971 rpcsWithMessages int 1972 msgsReceived int 1973 ihavesReceived int 1974} 1975 1976func (iwe *iwantEverything) handleStream(s network.Stream) { 1977 defer s.Close() 1978 1979 os, err := iwe.h.NewStream(context.Background(), s.Conn().RemotePeer(), GossipSubID_v10) 1980 if err != nil { 1981 panic(err) 1982 } 1983 1984 msgIdsReceived := make(map[string]struct{}) 1985 gossipMsgIdsReceived := make(map[string]struct{}) 1986 1987 // send a subscription for test in the output stream to become candidate for gossip 1988 r := protoio.NewDelimitedReader(s, 1<<20) 1989 w := protoio.NewDelimitedWriter(os) 1990 truth := true 1991 topic := "test" 1992 err = w.WriteMsg(&pb.RPC{Subscriptions: []*pb.RPC_SubOpts{&pb.RPC_SubOpts{Subscribe: &truth, Topicid: &topic}}}) 1993 1994 var rpc pb.RPC 1995 for { 1996 rpc.Reset() 1997 err = r.ReadMsg(&rpc) 1998 if err != nil { 1999 if err != io.EOF { 2000 s.Reset() 2001 } 2002 return 2003 } 2004 2005 iwe.lk.Lock() 2006 if len(rpc.Publish) != 0 { 2007 iwe.rpcsWithMessages++ 2008 } 2009 // keep track of unique message ids received 2010 for _, msg := range rpc.Publish { 2011 id := string(msg.Seqno) 2012 if _, seen := msgIdsReceived[id]; !seen { 2013 iwe.msgsReceived++ 2014 } 2015 msgIdsReceived[id] = struct{}{} 2016 } 2017 2018 if rpc.Control != nil { 2019 // send a PRUNE for all grafts, so we don't get direct message deliveries 2020 var prunes []*pb.ControlPrune 2021 for _, graft := range rpc.Control.Graft { 2022 prunes = append(prunes, &pb.ControlPrune{TopicID: graft.TopicID}) 2023 } 2024 2025 var iwants []*pb.ControlIWant 2026 for _, ihave := range rpc.Control.Ihave { 2027 iwants = append(iwants, &pb.ControlIWant{MessageIDs: ihave.MessageIDs}) 2028 for _, msgId := range ihave.MessageIDs { 2029 if _, seen := gossipMsgIdsReceived[msgId]; !seen { 2030 iwe.ihavesReceived++ 2031 } 2032 gossipMsgIdsReceived[msgId] = struct{}{} 2033 } 2034 } 2035 2036 out := rpcWithControl(nil, nil, iwants, nil, prunes) 2037 err = w.WriteMsg(out) 2038 if err != nil { 2039 panic(err) 2040 } 2041 } 2042 iwe.lk.Unlock() 2043 } 2044} 2045 2046func TestFragmentRPCFunction(t *testing.T) { 2047 p := peer.ID("some-peer") 2048 topic := "test" 2049 rpc := &RPC{from: p} 2050 limit := 1024 2051 2052 mkMsg := func(size int) *pb.Message { 2053 msg := &pb.Message{} 2054 msg.Data = make([]byte, size-4) // subtract the protobuf overhead, so msg.Size() returns requested size 2055 rand.Read(msg.Data) 2056 return msg 2057 } 2058 2059 ensureBelowLimit := func(rpcs []*RPC) { 2060 for _, r := range rpcs { 2061 if r.Size() > limit { 2062 t.Fatalf("expected fragmented RPC to be below %d bytes, was %d", limit, r.Size()) 2063 } 2064 } 2065 } 2066 2067 // it should not fragment if everything fits in one RPC 2068 rpc.Publish = []*pb.Message{} 2069 rpc.Publish = []*pb.Message{mkMsg(10), mkMsg(10)} 2070 results, err := fragmentRPC(rpc, limit) 2071 if err != nil { 2072 t.Fatal(err) 2073 } 2074 if len(results) != 1 { 2075 t.Fatalf("expected single RPC if input is < limit, got %d", len(results)) 2076 } 2077 2078 // if there's a message larger than the limit, we should fail 2079 rpc.Publish = []*pb.Message{mkMsg(10), mkMsg(limit * 2)} 2080 results, err = fragmentRPC(rpc, limit) 2081 if err == nil { 2082 t.Fatalf("expected an error if a message exceeds limit, got %d RPCs instead", len(results)) 2083 } 2084 2085 // if the individual messages are below the limit, but the RPC as a whole is larger, we should fragment 2086 nMessages := 100 2087 msgSize := 200 2088 truth := true 2089 rpc.Subscriptions = []*pb.RPC_SubOpts{ 2090 { 2091 Subscribe: &truth, 2092 Topicid: &topic, 2093 }, 2094 } 2095 rpc.Publish = make([]*pb.Message, nMessages) 2096 for i := 0; i < nMessages; i++ { 2097 rpc.Publish[i] = mkMsg(msgSize) 2098 } 2099 results, err = fragmentRPC(rpc, limit) 2100 if err != nil { 2101 t.Fatal(err) 2102 } 2103 ensureBelowLimit(results) 2104 msgsPerRPC := limit / msgSize 2105 expectedRPCs := nMessages / msgsPerRPC 2106 if len(results) != expectedRPCs { 2107 t.Fatalf("expected %d RPC messages in output, got %d", expectedRPCs, len(results)) 2108 } 2109 var nMessagesFragmented int 2110 var nSubscriptions int 2111 for _, r := range results { 2112 nMessagesFragmented += len(r.Publish) 2113 nSubscriptions += len(r.Subscriptions) 2114 } 2115 if nMessagesFragmented != nMessages { 2116 t.Fatalf("expected fragemented RPCs to contain same number of messages as input, got %d / %d", nMessagesFragmented, nMessages) 2117 } 2118 if nSubscriptions != 1 { 2119 t.Fatal("expected subscription to be present in one of the fragmented messages, but not found") 2120 } 2121 2122 // if we're fragmenting, and the input RPC has control messages, 2123 // the control messages should be in a separate RPC at the end 2124 // reuse RPC from prev test, but add a control message 2125 rpc.Control = &pb.ControlMessage{ 2126 Graft: []*pb.ControlGraft{{TopicID: &topic}}, 2127 Prune: []*pb.ControlPrune{{TopicID: &topic}}, 2128 Ihave: []*pb.ControlIHave{{MessageIDs: []string{"foo"}}}, 2129 Iwant: []*pb.ControlIWant{{MessageIDs: []string{"bar"}}}, 2130 } 2131 results, err = fragmentRPC(rpc, limit) 2132 if err != nil { 2133 t.Fatal(err) 2134 } 2135 ensureBelowLimit(results) 2136 // we expect one more RPC than last time, with the final one containing the control messages 2137 expectedCtrl := 1 2138 expectedRPCs = (nMessages / msgsPerRPC) + expectedCtrl 2139 if len(results) != expectedRPCs { 2140 t.Fatalf("expected %d RPC messages in output, got %d", expectedRPCs, len(results)) 2141 } 2142 ctl := results[len(results)-1].Control 2143 if ctl == nil { 2144 t.Fatal("expected final fragmented RPC to contain control messages, but .Control was nil") 2145 } 2146 // since it was not altered, the original control message should be identical to the output control message 2147 originalBytes, err := rpc.Control.Marshal() 2148 if err != nil { 2149 t.Fatal(err) 2150 } 2151 receivedBytes, err := ctl.Marshal() 2152 if err != nil { 2153 t.Fatal(err) 2154 } 2155 if !bytes.Equal(originalBytes, receivedBytes) { 2156 t.Fatal("expected control message to be unaltered if it fits within one RPC message") 2157 } 2158 2159 // if the control message is too large to fit into a single RPC, it should be split into multiple RPCs 2160 nTopics := 5 // pretend we're subscribed to multiple topics and sending IHAVE / IWANTs for each 2161 messageIdSize := 32 2162 msgsPerTopic := 100 // enough that a single IHAVE or IWANT will exceed the limit 2163 rpc.Control.Ihave = make([]*pb.ControlIHave, nTopics) 2164 rpc.Control.Iwant = make([]*pb.ControlIWant, nTopics) 2165 for i := 0; i < nTopics; i++ { 2166 messageIds := make([]string, msgsPerTopic) 2167 for m := 0; m < msgsPerTopic; m++ { 2168 mid := make([]byte, messageIdSize) 2169 rand.Read(mid) 2170 messageIds[m] = string(mid) 2171 } 2172 rpc.Control.Ihave[i] = &pb.ControlIHave{MessageIDs: messageIds} 2173 rpc.Control.Iwant[i] = &pb.ControlIWant{MessageIDs: messageIds} 2174 } 2175 results, err = fragmentRPC(rpc, limit) 2176 if err != nil { 2177 t.Fatal(err) 2178 } 2179 ensureBelowLimit(results) 2180 minExpectedCtl := rpc.Control.Size() / limit 2181 minExpectedRPCs := (nMessages / msgsPerRPC) + minExpectedCtl 2182 if len(results) < minExpectedRPCs { 2183 t.Fatalf("expected at least %d total RPCs (at least %d with control messages), got %d total", expectedRPCs, expectedCtrl, len(results)) 2184 } 2185 2186 // Test the pathological case where a single gossip message ID exceeds the limit. 2187 // It should not be present in the fragmented messages, but smaller IDs should be 2188 rpc.Reset() 2189 giantIdBytes := make([]byte, limit*2) 2190 rand.Read(giantIdBytes) 2191 rpc.Control = &pb.ControlMessage{ 2192 Iwant: []*pb.ControlIWant{ 2193 {MessageIDs: []string{"hello", string(giantIdBytes)}}, 2194 }, 2195 } 2196 results, err = fragmentRPC(rpc, limit) 2197 if err != nil { 2198 t.Fatal(err) 2199 } 2200 if len(results) != 1 { 2201 t.Fatalf("expected 1 RPC, got %d", len(results)) 2202 } 2203 if len(results[0].Control.Iwant) != 1 { 2204 t.Fatalf("expected 1 IWANT, got %d", len(results[0].Control.Iwant)) 2205 } 2206 if results[0].Control.Iwant[0].MessageIDs[0] != "hello" { 2207 t.Fatalf("expected small message ID to be included unaltered, got %s instead", 2208 results[0].Control.Iwant[0].MessageIDs[0]) 2209 } 2210} 2211