1package self_test
2
3import (
4	"context"
5	"fmt"
6	"io/ioutil"
7	"net"
8	"sync"
9
10	quic "github.com/lucas-clemente/quic-go"
11	"github.com/lucas-clemente/quic-go/internal/protocol"
12
13	. "github.com/onsi/ginkgo"
14	. "github.com/onsi/gomega"
15)
16
17var _ = Describe("Unidirectional Streams", func() {
18	const numStreams = 500
19
20	var (
21		server     quic.Listener
22		serverAddr string
23		qconf      *quic.Config
24	)
25
26	BeforeEach(func() {
27		var err error
28		qconf = &quic.Config{Versions: []protocol.VersionNumber{protocol.VersionTLS}}
29		server, err = quic.ListenAddr("localhost:0", getTLSConfig(), getQuicConfig(qconf))
30		Expect(err).ToNot(HaveOccurred())
31		serverAddr = fmt.Sprintf("localhost:%d", server.Addr().(*net.UDPAddr).Port)
32	})
33
34	AfterEach(func() {
35		server.Close()
36	})
37
38	dataForStream := func(id protocol.StreamID) []byte {
39		return GeneratePRData(10 * int(id))
40	}
41
42	runSendingPeer := func(sess quic.Session) {
43		for i := 0; i < numStreams; i++ {
44			str, err := sess.OpenUniStreamSync(context.Background())
45			Expect(err).ToNot(HaveOccurred())
46			go func() {
47				defer GinkgoRecover()
48				_, err := str.Write(dataForStream(str.StreamID()))
49				Expect(err).ToNot(HaveOccurred())
50				Expect(str.Close()).To(Succeed())
51			}()
52		}
53	}
54
55	runReceivingPeer := func(sess quic.Session) {
56		var wg sync.WaitGroup
57		wg.Add(numStreams)
58		for i := 0; i < numStreams; i++ {
59			str, err := sess.AcceptUniStream(context.Background())
60			Expect(err).ToNot(HaveOccurred())
61			go func() {
62				defer GinkgoRecover()
63				defer wg.Done()
64				data, err := ioutil.ReadAll(str)
65				Expect(err).ToNot(HaveOccurred())
66				Expect(data).To(Equal(dataForStream(str.StreamID())))
67			}()
68		}
69		wg.Wait()
70	}
71
72	It(fmt.Sprintf("client opening %d streams to a server", numStreams), func() {
73		go func() {
74			defer GinkgoRecover()
75			sess, err := server.Accept(context.Background())
76			Expect(err).ToNot(HaveOccurred())
77			runReceivingPeer(sess)
78			sess.CloseWithError(0, "")
79		}()
80
81		client, err := quic.DialAddr(
82			serverAddr,
83			getTLSClientConfig(),
84			getQuicConfig(qconf),
85		)
86		Expect(err).ToNot(HaveOccurred())
87		runSendingPeer(client)
88		<-client.Context().Done()
89	})
90
91	It(fmt.Sprintf("server opening %d streams to a client", numStreams), func() {
92		go func() {
93			defer GinkgoRecover()
94			sess, err := server.Accept(context.Background())
95			Expect(err).ToNot(HaveOccurred())
96			runSendingPeer(sess)
97		}()
98
99		client, err := quic.DialAddr(
100			serverAddr,
101			getTLSClientConfig(),
102			getQuicConfig(qconf),
103		)
104		Expect(err).ToNot(HaveOccurred())
105		runReceivingPeer(client)
106	})
107
108	It(fmt.Sprintf("client and server opening %d streams each and sending data to the peer", numStreams), func() {
109		done1 := make(chan struct{})
110		go func() {
111			defer GinkgoRecover()
112			sess, err := server.Accept(context.Background())
113			Expect(err).ToNot(HaveOccurred())
114			done := make(chan struct{})
115			go func() {
116				defer GinkgoRecover()
117				runReceivingPeer(sess)
118				close(done)
119			}()
120			runSendingPeer(sess)
121			<-done
122			close(done1)
123		}()
124
125		client, err := quic.DialAddr(
126			serverAddr,
127			getTLSClientConfig(),
128			getQuicConfig(qconf),
129		)
130		Expect(err).ToNot(HaveOccurred())
131		done2 := make(chan struct{})
132		go func() {
133			defer GinkgoRecover()
134			runSendingPeer(client)
135			close(done2)
136		}()
137		runReceivingPeer(client)
138		<-done1
139		<-done2
140	})
141})
142