1package self_test 2 3import ( 4 "context" 5 "fmt" 6 "io/ioutil" 7 "net" 8 "sync" 9 10 quic "github.com/lucas-clemente/quic-go" 11 "github.com/lucas-clemente/quic-go/internal/protocol" 12 13 . "github.com/onsi/ginkgo" 14 . "github.com/onsi/gomega" 15) 16 17var _ = Describe("Unidirectional Streams", func() { 18 const numStreams = 500 19 20 var ( 21 server quic.Listener 22 serverAddr string 23 qconf *quic.Config 24 ) 25 26 BeforeEach(func() { 27 var err error 28 qconf = &quic.Config{Versions: []protocol.VersionNumber{protocol.VersionTLS}} 29 server, err = quic.ListenAddr("localhost:0", getTLSConfig(), getQuicConfig(qconf)) 30 Expect(err).ToNot(HaveOccurred()) 31 serverAddr = fmt.Sprintf("localhost:%d", server.Addr().(*net.UDPAddr).Port) 32 }) 33 34 AfterEach(func() { 35 server.Close() 36 }) 37 38 dataForStream := func(id protocol.StreamID) []byte { 39 return GeneratePRData(10 * int(id)) 40 } 41 42 runSendingPeer := func(sess quic.Session) { 43 for i := 0; i < numStreams; i++ { 44 str, err := sess.OpenUniStreamSync(context.Background()) 45 Expect(err).ToNot(HaveOccurred()) 46 go func() { 47 defer GinkgoRecover() 48 _, err := str.Write(dataForStream(str.StreamID())) 49 Expect(err).ToNot(HaveOccurred()) 50 Expect(str.Close()).To(Succeed()) 51 }() 52 } 53 } 54 55 runReceivingPeer := func(sess quic.Session) { 56 var wg sync.WaitGroup 57 wg.Add(numStreams) 58 for i := 0; i < numStreams; i++ { 59 str, err := sess.AcceptUniStream(context.Background()) 60 Expect(err).ToNot(HaveOccurred()) 61 go func() { 62 defer GinkgoRecover() 63 defer wg.Done() 64 data, err := ioutil.ReadAll(str) 65 Expect(err).ToNot(HaveOccurred()) 66 Expect(data).To(Equal(dataForStream(str.StreamID()))) 67 }() 68 } 69 wg.Wait() 70 } 71 72 It(fmt.Sprintf("client opening %d streams to a server", numStreams), func() { 73 go func() { 74 defer GinkgoRecover() 75 sess, err := server.Accept(context.Background()) 76 Expect(err).ToNot(HaveOccurred()) 77 runReceivingPeer(sess) 78 sess.CloseWithError(0, "") 79 }() 80 81 client, err := quic.DialAddr( 82 serverAddr, 83 getTLSClientConfig(), 84 getQuicConfig(qconf), 85 ) 86 Expect(err).ToNot(HaveOccurred()) 87 runSendingPeer(client) 88 <-client.Context().Done() 89 }) 90 91 It(fmt.Sprintf("server opening %d streams to a client", numStreams), func() { 92 go func() { 93 defer GinkgoRecover() 94 sess, err := server.Accept(context.Background()) 95 Expect(err).ToNot(HaveOccurred()) 96 runSendingPeer(sess) 97 }() 98 99 client, err := quic.DialAddr( 100 serverAddr, 101 getTLSClientConfig(), 102 getQuicConfig(qconf), 103 ) 104 Expect(err).ToNot(HaveOccurred()) 105 runReceivingPeer(client) 106 }) 107 108 It(fmt.Sprintf("client and server opening %d streams each and sending data to the peer", numStreams), func() { 109 done1 := make(chan struct{}) 110 go func() { 111 defer GinkgoRecover() 112 sess, err := server.Accept(context.Background()) 113 Expect(err).ToNot(HaveOccurred()) 114 done := make(chan struct{}) 115 go func() { 116 defer GinkgoRecover() 117 runReceivingPeer(sess) 118 close(done) 119 }() 120 runSendingPeer(sess) 121 <-done 122 close(done1) 123 }() 124 125 client, err := quic.DialAddr( 126 serverAddr, 127 getTLSClientConfig(), 128 getQuicConfig(qconf), 129 ) 130 Expect(err).ToNot(HaveOccurred()) 131 done2 := make(chan struct{}) 132 go func() { 133 defer GinkgoRecover() 134 runSendingPeer(client) 135 close(done2) 136 }() 137 runReceivingPeer(client) 138 <-done1 139 <-done2 140 }) 141}) 142