1/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package latency
20
21import (
22	"bytes"
23	"fmt"
24	"net"
25	"reflect"
26	"sync"
27	"testing"
28	"time"
29
30	"google.golang.org/grpc/internal/grpctest"
31)
32
33type s struct {
34	grpctest.Tester
35}
36
37func Test(t *testing.T) {
38	grpctest.RunSubTests(t, s{})
39}
40
41// bufConn is a net.Conn implemented by a bytes.Buffer (which is a ReadWriter).
42type bufConn struct {
43	*bytes.Buffer
44}
45
46func (bufConn) Close() error                       { panic("unimplemented") }
47func (bufConn) LocalAddr() net.Addr                { panic("unimplemented") }
48func (bufConn) RemoteAddr() net.Addr               { panic("unimplemented") }
49func (bufConn) SetDeadline(t time.Time) error      { panic("unimplemneted") }
50func (bufConn) SetReadDeadline(t time.Time) error  { panic("unimplemneted") }
51func (bufConn) SetWriteDeadline(t time.Time) error { panic("unimplemneted") }
52
53func restoreHooks() func() {
54	s := sleep
55	n := now
56	return func() {
57		sleep = s
58		now = n
59	}
60}
61
62func (s) TestConn(t *testing.T) {
63	defer restoreHooks()()
64
65	// Constant time.
66	now = func() time.Time { return time.Unix(123, 456) }
67
68	// Capture sleep times for checking later.
69	var sleepTimes []time.Duration
70	sleep = func(t time.Duration) { sleepTimes = append(sleepTimes, t) }
71
72	wantSleeps := func(want ...time.Duration) {
73		if !reflect.DeepEqual(want, sleepTimes) {
74			t.Fatalf("sleepTimes = %v; want %v", sleepTimes, want)
75		}
76		sleepTimes = nil
77	}
78
79	// Use a fairly high latency to cause a large BDP and avoid sleeps while
80	// writing due to simulation of full buffers.
81	latency := 1 * time.Second
82	c, err := (&Network{Kbps: 1, Latency: latency, MTU: 5}).Conn(bufConn{&bytes.Buffer{}})
83	if err != nil {
84		t.Fatalf("Unexpected error creating connection: %v", err)
85	}
86	wantSleeps(latency) // Connection creation delay.
87
88	// 1 kbps = 128 Bps.  Divides evenly by 1 second using nanos.
89	byteLatency := time.Duration(time.Second / 128)
90
91	write := func(b []byte) {
92		n, err := c.Write(b)
93		if n != len(b) || err != nil {
94			t.Fatalf("c.Write(%v) = %v, %v; want %v, nil", b, n, err, len(b))
95		}
96	}
97
98	write([]byte{1, 2, 3, 4, 5}) // One full packet
99	pkt1Time := latency + byteLatency*5
100	write([]byte{6}) // One partial packet
101	pkt2Time := pkt1Time + byteLatency
102	write([]byte{7, 8, 9, 10, 11, 12, 13}) // Two packets
103	pkt3Time := pkt2Time + byteLatency*5
104	pkt4Time := pkt3Time + byteLatency*2
105
106	// No reads, so no sleeps yet.
107	wantSleeps()
108
109	read := func(n int, want []byte) {
110		b := make([]byte, n)
111		if rd, err := c.Read(b); err != nil || rd != len(want) {
112			t.Fatalf("c.Read(<%v bytes>) = %v, %v; want %v, nil", n, rd, err, len(want))
113		}
114		if !reflect.DeepEqual(b[:len(want)], want) {
115			t.Fatalf("read %v; want %v", b, want)
116		}
117	}
118
119	read(1, []byte{1})
120	wantSleeps(pkt1Time)
121	read(1, []byte{2})
122	wantSleeps()
123	read(3, []byte{3, 4, 5})
124	wantSleeps()
125	read(2, []byte{6})
126	wantSleeps(pkt2Time)
127	read(2, []byte{7, 8})
128	wantSleeps(pkt3Time)
129	read(10, []byte{9, 10, 11})
130	wantSleeps()
131	read(10, []byte{12, 13})
132	wantSleeps(pkt4Time)
133}
134
135func (s) TestSync(t *testing.T) {
136	defer restoreHooks()()
137
138	// Infinitely fast CPU: time doesn't pass unless sleep is called.
139	tn := time.Unix(123, 0)
140	now = func() time.Time { return tn }
141	sleep = func(d time.Duration) { tn = tn.Add(d) }
142
143	// Simulate a 20ms latency network, then run sync across that and expect to
144	// measure 20ms latency, or 10ms additional delay for a 30ms network.
145	slowConn, err := (&Network{Kbps: 0, Latency: 20 * time.Millisecond, MTU: 5}).Conn(bufConn{&bytes.Buffer{}})
146	if err != nil {
147		t.Fatalf("Unexpected error creating connection: %v", err)
148	}
149	c, err := (&Network{Latency: 30 * time.Millisecond}).Conn(slowConn)
150	if err != nil {
151		t.Fatalf("Unexpected error creating connection: %v", err)
152	}
153	if c.(*conn).delay != 10*time.Millisecond {
154		t.Fatalf("c.delay = %v; want 10ms", c.(*conn).delay)
155	}
156}
157
158func (s) TestSyncTooSlow(t *testing.T) {
159	defer restoreHooks()()
160
161	// Infinitely fast CPU: time doesn't pass unless sleep is called.
162	tn := time.Unix(123, 0)
163	now = func() time.Time { return tn }
164	sleep = func(d time.Duration) { tn = tn.Add(d) }
165
166	// Simulate a 10ms latency network, then attempt to simulate a 5ms latency
167	// network and expect an error.
168	slowConn, err := (&Network{Kbps: 0, Latency: 10 * time.Millisecond, MTU: 5}).Conn(bufConn{&bytes.Buffer{}})
169	if err != nil {
170		t.Fatalf("Unexpected error creating connection: %v", err)
171	}
172
173	errWant := "measured network latency (10ms) higher than desired latency (5ms)"
174	if _, err := (&Network{Latency: 5 * time.Millisecond}).Conn(slowConn); err == nil || err.Error() != errWant {
175		t.Fatalf("Conn() = _, %q; want _, %q", err, errWant)
176	}
177}
178
179func (s) TestListenerAndDialer(t *testing.T) {
180	defer restoreHooks()()
181
182	tn := time.Unix(123, 0)
183	startTime := tn
184	mu := &sync.Mutex{}
185	now = func() time.Time {
186		mu.Lock()
187		defer mu.Unlock()
188		return tn
189	}
190
191	// Use a fairly high latency to cause a large BDP and avoid sleeps while
192	// writing due to simulation of full buffers.
193	n := &Network{Kbps: 2, Latency: 1 * time.Second, MTU: 10}
194	// 2 kbps = .25 kBps = 256 Bps
195	byteLatency := func(n int) time.Duration {
196		return time.Duration(n) * time.Second / 256
197	}
198
199	// Create a real listener and wrap it.
200	l, err := net.Listen("tcp", "localhost:0")
201	if err != nil {
202		t.Fatalf("Unexpected error creating listener: %v", err)
203	}
204	defer l.Close()
205	l = n.Listener(l)
206
207	var serverConn net.Conn
208	var scErr error
209	scDone := make(chan struct{})
210	go func() {
211		serverConn, scErr = l.Accept()
212		close(scDone)
213	}()
214
215	// Create a dialer and use it.
216	clientConn, err := n.TimeoutDialer(net.DialTimeout)("tcp", l.Addr().String(), 2*time.Second)
217	if err != nil {
218		t.Fatalf("Unexpected error dialing: %v", err)
219	}
220	defer clientConn.Close()
221
222	// Block until server's Conn is available.
223	<-scDone
224	if scErr != nil {
225		t.Fatalf("Unexpected error listening: %v", scErr)
226	}
227	defer serverConn.Close()
228
229	// sleep (only) advances tn.   Done after connections established so sync detects zero delay.
230	sleep = func(d time.Duration) {
231		mu.Lock()
232		defer mu.Unlock()
233		if d > 0 {
234			tn = tn.Add(d)
235		}
236	}
237
238	seq := func(a, b int) []byte {
239		buf := make([]byte, b-a)
240		for i := 0; i < b-a; i++ {
241			buf[i] = byte(i + a)
242		}
243		return buf
244	}
245
246	pkt1 := seq(0, 10)
247	pkt2 := seq(10, 30)
248	pkt3 := seq(30, 35)
249
250	write := func(c net.Conn, b []byte) {
251		n, err := c.Write(b)
252		if n != len(b) || err != nil {
253			t.Fatalf("c.Write(%v) = %v, %v; want %v, nil", b, n, err, len(b))
254		}
255	}
256
257	write(serverConn, pkt1)
258	write(serverConn, pkt2)
259	write(serverConn, pkt3)
260	write(clientConn, pkt3)
261	write(clientConn, pkt1)
262	write(clientConn, pkt2)
263
264	if tn != startTime {
265		t.Fatalf("unexpected sleep in write; tn = %v; want %v", tn, startTime)
266	}
267
268	read := func(c net.Conn, n int, want []byte, timeWant time.Time) {
269		b := make([]byte, n)
270		if rd, err := c.Read(b); err != nil || rd != len(want) {
271			t.Fatalf("c.Read(<%v bytes>) = %v, %v; want %v, nil (read: %v)", n, rd, err, len(want), b[:rd])
272		}
273		if !reflect.DeepEqual(b[:len(want)], want) {
274			t.Fatalf("read %v; want %v", b, want)
275		}
276		if !tn.Equal(timeWant) {
277			t.Errorf("tn after read(%v) = %v; want %v", want, tn, timeWant)
278		}
279	}
280
281	read(clientConn, len(pkt1)+1, pkt1, startTime.Add(n.Latency+byteLatency(len(pkt1))))
282	read(serverConn, len(pkt3)+1, pkt3, tn) // tn was advanced by the above read; pkt3 is shorter than pkt1
283
284	read(clientConn, len(pkt2), pkt2[:10], startTime.Add(n.Latency+byteLatency(len(pkt1)+10)))
285	read(clientConn, len(pkt2), pkt2[10:], startTime.Add(n.Latency+byteLatency(len(pkt1)+len(pkt2))))
286	read(clientConn, len(pkt3), pkt3, startTime.Add(n.Latency+byteLatency(len(pkt1)+len(pkt2)+len(pkt3))))
287
288	read(serverConn, len(pkt1), pkt1, tn) // tn already past the arrival time due to prior reads
289	read(serverConn, len(pkt2), pkt2[:10], tn)
290	read(serverConn, len(pkt2), pkt2[10:], tn)
291
292	// Sleep awhile and make sure the read happens disregarding previous writes
293	// (lastSendEnd handling).
294	sleep(10 * time.Second)
295	write(clientConn, pkt1)
296	read(serverConn, len(pkt1), pkt1, tn.Add(n.Latency+byteLatency(len(pkt1))))
297
298	// Send, sleep longer than the network delay, then make sure the read happens
299	// instantly.
300	write(serverConn, pkt1)
301	sleep(10 * time.Second)
302	read(clientConn, len(pkt1), pkt1, tn)
303}
304
305func (s) TestBufferBloat(t *testing.T) {
306	defer restoreHooks()()
307
308	// Infinitely fast CPU: time doesn't pass unless sleep is called.
309	tn := time.Unix(123, 0)
310	now = func() time.Time { return tn }
311	// Capture sleep times for checking later.
312	var sleepTimes []time.Duration
313	sleep = func(d time.Duration) {
314		sleepTimes = append(sleepTimes, d)
315		tn = tn.Add(d)
316	}
317
318	wantSleeps := func(want ...time.Duration) error {
319		if !reflect.DeepEqual(want, sleepTimes) {
320			return fmt.Errorf("sleepTimes = %v; want %v", sleepTimes, want)
321		}
322		sleepTimes = nil
323		return nil
324	}
325
326	n := &Network{Kbps: 8 /* 1KBps */, Latency: time.Second, MTU: 8}
327	bdpBytes := (n.Kbps * 1024 / 8) * int(n.Latency/time.Second) // 1024
328	c, err := n.Conn(bufConn{&bytes.Buffer{}})
329	if err != nil {
330		t.Fatalf("Unexpected error creating connection: %v", err)
331	}
332	wantSleeps(n.Latency) // Connection creation delay.
333
334	write := func(n int, sleeps ...time.Duration) {
335		if wt, err := c.Write(make([]byte, n)); err != nil || wt != n {
336			t.Fatalf("c.Write(<%v bytes>) = %v, %v; want %v, nil", n, wt, err, n)
337		}
338		if err := wantSleeps(sleeps...); err != nil {
339			t.Fatalf("After writing %v bytes: %v", n, err)
340		}
341	}
342
343	read := func(n int, sleeps ...time.Duration) {
344		if rd, err := c.Read(make([]byte, n)); err != nil || rd != n {
345			t.Fatalf("c.Read(_) = %v, %v; want %v, nil", rd, err, n)
346		}
347		if err := wantSleeps(sleeps...); err != nil {
348			t.Fatalf("After reading %v bytes: %v", n, err)
349		}
350	}
351
352	write(8) // No reads and buffer not full, so no sleeps yet.
353	read(8, time.Second+n.pktTime(8))
354
355	write(bdpBytes)            // Fill the buffer.
356	write(1)                   // We can send one extra packet even when the buffer is full.
357	write(n.MTU, n.pktTime(1)) // Make sure we sleep to clear the previous write.
358	write(1, n.pktTime(n.MTU))
359	write(n.MTU+1, n.pktTime(1), n.pktTime(n.MTU))
360
361	tn = tn.Add(10 * time.Second) // Wait long enough for the buffer to clear.
362	write(bdpBytes)               // No sleeps required.
363}
364