1/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package latency
20
21import (
22	"bytes"
23	"fmt"
24	"net"
25	"reflect"
26	"sync"
27	"testing"
28	"time"
29)
30
31// bufConn is a net.Conn implemented by a bytes.Buffer (which is a ReadWriter).
32type bufConn struct {
33	*bytes.Buffer
34}
35
36func (bufConn) Close() error                       { panic("unimplemented") }
37func (bufConn) LocalAddr() net.Addr                { panic("unimplemented") }
38func (bufConn) RemoteAddr() net.Addr               { panic("unimplemented") }
39func (bufConn) SetDeadline(t time.Time) error      { panic("unimplemneted") }
40func (bufConn) SetReadDeadline(t time.Time) error  { panic("unimplemneted") }
41func (bufConn) SetWriteDeadline(t time.Time) error { panic("unimplemneted") }
42
43func restoreHooks() func() {
44	s := sleep
45	n := now
46	return func() {
47		sleep = s
48		now = n
49	}
50}
51
52func TestConn(t *testing.T) {
53	defer restoreHooks()()
54
55	// Constant time.
56	now = func() time.Time { return time.Unix(123, 456) }
57
58	// Capture sleep times for checking later.
59	var sleepTimes []time.Duration
60	sleep = func(t time.Duration) { sleepTimes = append(sleepTimes, t) }
61
62	wantSleeps := func(want ...time.Duration) {
63		if !reflect.DeepEqual(want, sleepTimes) {
64			t.Fatalf("sleepTimes = %v; want %v", sleepTimes, want)
65		}
66		sleepTimes = nil
67	}
68
69	// Use a fairly high latency to cause a large BDP and avoid sleeps while
70	// writing due to simulation of full buffers.
71	latency := 1 * time.Second
72	c, err := (&Network{Kbps: 1, Latency: latency, MTU: 5}).Conn(bufConn{&bytes.Buffer{}})
73	if err != nil {
74		t.Fatalf("Unexpected error creating connection: %v", err)
75	}
76	wantSleeps(latency) // Connection creation delay.
77
78	// 1 kbps = 128 Bps.  Divides evenly by 1 second using nanos.
79	byteLatency := time.Duration(time.Second / 128)
80
81	write := func(b []byte) {
82		n, err := c.Write(b)
83		if n != len(b) || err != nil {
84			t.Fatalf("c.Write(%v) = %v, %v; want %v, nil", b, n, err, len(b))
85		}
86	}
87
88	write([]byte{1, 2, 3, 4, 5}) // One full packet
89	pkt1Time := latency + byteLatency*5
90	write([]byte{6}) // One partial packet
91	pkt2Time := pkt1Time + byteLatency
92	write([]byte{7, 8, 9, 10, 11, 12, 13}) // Two packets
93	pkt3Time := pkt2Time + byteLatency*5
94	pkt4Time := pkt3Time + byteLatency*2
95
96	// No reads, so no sleeps yet.
97	wantSleeps()
98
99	read := func(n int, want []byte) {
100		b := make([]byte, n)
101		if rd, err := c.Read(b); err != nil || rd != len(want) {
102			t.Fatalf("c.Read(<%v bytes>) = %v, %v; want %v, nil", n, rd, err, len(want))
103		}
104		if !reflect.DeepEqual(b[:len(want)], want) {
105			t.Fatalf("read %v; want %v", b, want)
106		}
107	}
108
109	read(1, []byte{1})
110	wantSleeps(pkt1Time)
111	read(1, []byte{2})
112	wantSleeps()
113	read(3, []byte{3, 4, 5})
114	wantSleeps()
115	read(2, []byte{6})
116	wantSleeps(pkt2Time)
117	read(2, []byte{7, 8})
118	wantSleeps(pkt3Time)
119	read(10, []byte{9, 10, 11})
120	wantSleeps()
121	read(10, []byte{12, 13})
122	wantSleeps(pkt4Time)
123}
124
125func TestSync(t *testing.T) {
126	defer restoreHooks()()
127
128	// Infinitely fast CPU: time doesn't pass unless sleep is called.
129	tn := time.Unix(123, 0)
130	now = func() time.Time { return tn }
131	sleep = func(d time.Duration) { tn = tn.Add(d) }
132
133	// Simulate a 20ms latency network, then run sync across that and expect to
134	// measure 20ms latency, or 10ms additional delay for a 30ms network.
135	slowConn, err := (&Network{Kbps: 0, Latency: 20 * time.Millisecond, MTU: 5}).Conn(bufConn{&bytes.Buffer{}})
136	if err != nil {
137		t.Fatalf("Unexpected error creating connection: %v", err)
138	}
139	c, err := (&Network{Latency: 30 * time.Millisecond}).Conn(slowConn)
140	if err != nil {
141		t.Fatalf("Unexpected error creating connection: %v", err)
142	}
143	if c.(*conn).delay != 10*time.Millisecond {
144		t.Fatalf("c.delay = %v; want 10ms", c.(*conn).delay)
145	}
146}
147
148func TestSyncTooSlow(t *testing.T) {
149	defer restoreHooks()()
150
151	// Infinitely fast CPU: time doesn't pass unless sleep is called.
152	tn := time.Unix(123, 0)
153	now = func() time.Time { return tn }
154	sleep = func(d time.Duration) { tn = tn.Add(d) }
155
156	// Simulate a 10ms latency network, then attempt to simulate a 5ms latency
157	// network and expect an error.
158	slowConn, err := (&Network{Kbps: 0, Latency: 10 * time.Millisecond, MTU: 5}).Conn(bufConn{&bytes.Buffer{}})
159	if err != nil {
160		t.Fatalf("Unexpected error creating connection: %v", err)
161	}
162
163	errWant := "measured network latency (10ms) higher than desired latency (5ms)"
164	if _, err := (&Network{Latency: 5 * time.Millisecond}).Conn(slowConn); err == nil || err.Error() != errWant {
165		t.Fatalf("Conn() = _, %q; want _, %q", err, errWant)
166	}
167}
168
169func TestListenerAndDialer(t *testing.T) {
170	defer restoreHooks()()
171
172	tn := time.Unix(123, 0)
173	startTime := tn
174	mu := &sync.Mutex{}
175	now = func() time.Time {
176		mu.Lock()
177		defer mu.Unlock()
178		return tn
179	}
180
181	// Use a fairly high latency to cause a large BDP and avoid sleeps while
182	// writing due to simulation of full buffers.
183	n := &Network{Kbps: 2, Latency: 1 * time.Second, MTU: 10}
184	// 2 kbps = .25 kBps = 256 Bps
185	byteLatency := func(n int) time.Duration {
186		return time.Duration(n) * time.Second / 256
187	}
188
189	// Create a real listener and wrap it.
190	l, err := net.Listen("tcp", "localhost:0")
191	if err != nil {
192		t.Fatalf("Unexpected error creating listener: %v", err)
193	}
194	defer l.Close()
195	l = n.Listener(l)
196
197	var serverConn net.Conn
198	var scErr error
199	scDone := make(chan struct{})
200	go func() {
201		serverConn, scErr = l.Accept()
202		close(scDone)
203	}()
204
205	// Create a dialer and use it.
206	clientConn, err := n.TimeoutDialer(net.DialTimeout)("tcp", l.Addr().String(), 2*time.Second)
207	if err != nil {
208		t.Fatalf("Unexpected error dialing: %v", err)
209	}
210	defer clientConn.Close()
211
212	// Block until server's Conn is available.
213	<-scDone
214	if scErr != nil {
215		t.Fatalf("Unexpected error listening: %v", scErr)
216	}
217	defer serverConn.Close()
218
219	// sleep (only) advances tn.   Done after connections established so sync detects zero delay.
220	sleep = func(d time.Duration) {
221		mu.Lock()
222		defer mu.Unlock()
223		if d > 0 {
224			tn = tn.Add(d)
225		}
226	}
227
228	seq := func(a, b int) []byte {
229		buf := make([]byte, b-a)
230		for i := 0; i < b-a; i++ {
231			buf[i] = byte(i + a)
232		}
233		return buf
234	}
235
236	pkt1 := seq(0, 10)
237	pkt2 := seq(10, 30)
238	pkt3 := seq(30, 35)
239
240	write := func(c net.Conn, b []byte) {
241		n, err := c.Write(b)
242		if n != len(b) || err != nil {
243			t.Fatalf("c.Write(%v) = %v, %v; want %v, nil", b, n, err, len(b))
244		}
245	}
246
247	write(serverConn, pkt1)
248	write(serverConn, pkt2)
249	write(serverConn, pkt3)
250	write(clientConn, pkt3)
251	write(clientConn, pkt1)
252	write(clientConn, pkt2)
253
254	if tn != startTime {
255		t.Fatalf("unexpected sleep in write; tn = %v; want %v", tn, startTime)
256	}
257
258	read := func(c net.Conn, n int, want []byte, timeWant time.Time) {
259		b := make([]byte, n)
260		if rd, err := c.Read(b); err != nil || rd != len(want) {
261			t.Fatalf("c.Read(<%v bytes>) = %v, %v; want %v, nil (read: %v)", n, rd, err, len(want), b[:rd])
262		}
263		if !reflect.DeepEqual(b[:len(want)], want) {
264			t.Fatalf("read %v; want %v", b, want)
265		}
266		if !tn.Equal(timeWant) {
267			t.Errorf("tn after read(%v) = %v; want %v", want, tn, timeWant)
268		}
269	}
270
271	read(clientConn, len(pkt1)+1, pkt1, startTime.Add(n.Latency+byteLatency(len(pkt1))))
272	read(serverConn, len(pkt3)+1, pkt3, tn) // tn was advanced by the above read; pkt3 is shorter than pkt1
273
274	read(clientConn, len(pkt2), pkt2[:10], startTime.Add(n.Latency+byteLatency(len(pkt1)+10)))
275	read(clientConn, len(pkt2), pkt2[10:], startTime.Add(n.Latency+byteLatency(len(pkt1)+len(pkt2))))
276	read(clientConn, len(pkt3), pkt3, startTime.Add(n.Latency+byteLatency(len(pkt1)+len(pkt2)+len(pkt3))))
277
278	read(serverConn, len(pkt1), pkt1, tn) // tn already past the arrival time due to prior reads
279	read(serverConn, len(pkt2), pkt2[:10], tn)
280	read(serverConn, len(pkt2), pkt2[10:], tn)
281
282	// Sleep awhile and make sure the read happens disregarding previous writes
283	// (lastSendEnd handling).
284	sleep(10 * time.Second)
285	write(clientConn, pkt1)
286	read(serverConn, len(pkt1), pkt1, tn.Add(n.Latency+byteLatency(len(pkt1))))
287
288	// Send, sleep longer than the network delay, then make sure the read happens
289	// instantly.
290	write(serverConn, pkt1)
291	sleep(10 * time.Second)
292	read(clientConn, len(pkt1), pkt1, tn)
293}
294
295func TestBufferBloat(t *testing.T) {
296	defer restoreHooks()()
297
298	// Infinitely fast CPU: time doesn't pass unless sleep is called.
299	tn := time.Unix(123, 0)
300	now = func() time.Time { return tn }
301	// Capture sleep times for checking later.
302	var sleepTimes []time.Duration
303	sleep = func(d time.Duration) {
304		sleepTimes = append(sleepTimes, d)
305		tn = tn.Add(d)
306	}
307
308	wantSleeps := func(want ...time.Duration) error {
309		if !reflect.DeepEqual(want, sleepTimes) {
310			return fmt.Errorf("sleepTimes = %v; want %v", sleepTimes, want)
311		}
312		sleepTimes = nil
313		return nil
314	}
315
316	n := &Network{Kbps: 8 /* 1KBps */, Latency: time.Second, MTU: 8}
317	bdpBytes := (n.Kbps * 1024 / 8) * int(n.Latency/time.Second) // 1024
318	c, err := n.Conn(bufConn{&bytes.Buffer{}})
319	if err != nil {
320		t.Fatalf("Unexpected error creating connection: %v", err)
321	}
322	wantSleeps(n.Latency) // Connection creation delay.
323
324	write := func(n int, sleeps ...time.Duration) {
325		if wt, err := c.Write(make([]byte, n)); err != nil || wt != n {
326			t.Fatalf("c.Write(<%v bytes>) = %v, %v; want %v, nil", n, wt, err, n)
327		}
328		if err := wantSleeps(sleeps...); err != nil {
329			t.Fatalf("After writing %v bytes: %v", n, err)
330		}
331	}
332
333	read := func(n int, sleeps ...time.Duration) {
334		if rd, err := c.Read(make([]byte, n)); err != nil || rd != n {
335			t.Fatalf("c.Read(_) = %v, %v; want %v, nil", rd, err, n)
336		}
337		if err := wantSleeps(sleeps...); err != nil {
338			t.Fatalf("After reading %v bytes: %v", n, err)
339		}
340	}
341
342	write(8) // No reads and buffer not full, so no sleeps yet.
343	read(8, time.Second+n.pktTime(8))
344
345	write(bdpBytes)            // Fill the buffer.
346	write(1)                   // We can send one extra packet even when the buffer is full.
347	write(n.MTU, n.pktTime(1)) // Make sure we sleep to clear the previous write.
348	write(1, n.pktTime(n.MTU))
349	write(n.MTU+1, n.pktTime(1), n.pktTime(n.MTU))
350
351	tn = tn.Add(10 * time.Second) // Wait long enough for the buffer to clear.
352	write(bdpBytes)               // No sleeps required.
353}
354