1/* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package latency 20 21import ( 22 "bytes" 23 "fmt" 24 "net" 25 "reflect" 26 "sync" 27 "testing" 28 "time" 29) 30 31// bufConn is a net.Conn implemented by a bytes.Buffer (which is a ReadWriter). 32type bufConn struct { 33 *bytes.Buffer 34} 35 36func (bufConn) Close() error { panic("unimplemented") } 37func (bufConn) LocalAddr() net.Addr { panic("unimplemented") } 38func (bufConn) RemoteAddr() net.Addr { panic("unimplemented") } 39func (bufConn) SetDeadline(t time.Time) error { panic("unimplemneted") } 40func (bufConn) SetReadDeadline(t time.Time) error { panic("unimplemneted") } 41func (bufConn) SetWriteDeadline(t time.Time) error { panic("unimplemneted") } 42 43func restoreHooks() func() { 44 s := sleep 45 n := now 46 return func() { 47 sleep = s 48 now = n 49 } 50} 51 52func TestConn(t *testing.T) { 53 defer restoreHooks()() 54 55 // Constant time. 56 now = func() time.Time { return time.Unix(123, 456) } 57 58 // Capture sleep times for checking later. 59 var sleepTimes []time.Duration 60 sleep = func(t time.Duration) { sleepTimes = append(sleepTimes, t) } 61 62 wantSleeps := func(want ...time.Duration) { 63 if !reflect.DeepEqual(want, sleepTimes) { 64 t.Fatalf("sleepTimes = %v; want %v", sleepTimes, want) 65 } 66 sleepTimes = nil 67 } 68 69 // Use a fairly high latency to cause a large BDP and avoid sleeps while 70 // writing due to simulation of full buffers. 71 latency := 1 * time.Second 72 c, err := (&Network{Kbps: 1, Latency: latency, MTU: 5}).Conn(bufConn{&bytes.Buffer{}}) 73 if err != nil { 74 t.Fatalf("Unexpected error creating connection: %v", err) 75 } 76 wantSleeps(latency) // Connection creation delay. 77 78 // 1 kbps = 128 Bps. Divides evenly by 1 second using nanos. 79 byteLatency := time.Duration(time.Second / 128) 80 81 write := func(b []byte) { 82 n, err := c.Write(b) 83 if n != len(b) || err != nil { 84 t.Fatalf("c.Write(%v) = %v, %v; want %v, nil", b, n, err, len(b)) 85 } 86 } 87 88 write([]byte{1, 2, 3, 4, 5}) // One full packet 89 pkt1Time := latency + byteLatency*5 90 write([]byte{6}) // One partial packet 91 pkt2Time := pkt1Time + byteLatency 92 write([]byte{7, 8, 9, 10, 11, 12, 13}) // Two packets 93 pkt3Time := pkt2Time + byteLatency*5 94 pkt4Time := pkt3Time + byteLatency*2 95 96 // No reads, so no sleeps yet. 97 wantSleeps() 98 99 read := func(n int, want []byte) { 100 b := make([]byte, n) 101 if rd, err := c.Read(b); err != nil || rd != len(want) { 102 t.Fatalf("c.Read(<%v bytes>) = %v, %v; want %v, nil", n, rd, err, len(want)) 103 } 104 if !reflect.DeepEqual(b[:len(want)], want) { 105 t.Fatalf("read %v; want %v", b, want) 106 } 107 } 108 109 read(1, []byte{1}) 110 wantSleeps(pkt1Time) 111 read(1, []byte{2}) 112 wantSleeps() 113 read(3, []byte{3, 4, 5}) 114 wantSleeps() 115 read(2, []byte{6}) 116 wantSleeps(pkt2Time) 117 read(2, []byte{7, 8}) 118 wantSleeps(pkt3Time) 119 read(10, []byte{9, 10, 11}) 120 wantSleeps() 121 read(10, []byte{12, 13}) 122 wantSleeps(pkt4Time) 123} 124 125func TestSync(t *testing.T) { 126 defer restoreHooks()() 127 128 // Infinitely fast CPU: time doesn't pass unless sleep is called. 129 tn := time.Unix(123, 0) 130 now = func() time.Time { return tn } 131 sleep = func(d time.Duration) { tn = tn.Add(d) } 132 133 // Simulate a 20ms latency network, then run sync across that and expect to 134 // measure 20ms latency, or 10ms additional delay for a 30ms network. 135 slowConn, err := (&Network{Kbps: 0, Latency: 20 * time.Millisecond, MTU: 5}).Conn(bufConn{&bytes.Buffer{}}) 136 if err != nil { 137 t.Fatalf("Unexpected error creating connection: %v", err) 138 } 139 c, err := (&Network{Latency: 30 * time.Millisecond}).Conn(slowConn) 140 if err != nil { 141 t.Fatalf("Unexpected error creating connection: %v", err) 142 } 143 if c.(*conn).delay != 10*time.Millisecond { 144 t.Fatalf("c.delay = %v; want 10ms", c.(*conn).delay) 145 } 146} 147 148func TestSyncTooSlow(t *testing.T) { 149 defer restoreHooks()() 150 151 // Infinitely fast CPU: time doesn't pass unless sleep is called. 152 tn := time.Unix(123, 0) 153 now = func() time.Time { return tn } 154 sleep = func(d time.Duration) { tn = tn.Add(d) } 155 156 // Simulate a 10ms latency network, then attempt to simulate a 5ms latency 157 // network and expect an error. 158 slowConn, err := (&Network{Kbps: 0, Latency: 10 * time.Millisecond, MTU: 5}).Conn(bufConn{&bytes.Buffer{}}) 159 if err != nil { 160 t.Fatalf("Unexpected error creating connection: %v", err) 161 } 162 163 errWant := "measured network latency (10ms) higher than desired latency (5ms)" 164 if _, err := (&Network{Latency: 5 * time.Millisecond}).Conn(slowConn); err == nil || err.Error() != errWant { 165 t.Fatalf("Conn() = _, %q; want _, %q", err, errWant) 166 } 167} 168 169func TestListenerAndDialer(t *testing.T) { 170 defer restoreHooks()() 171 172 tn := time.Unix(123, 0) 173 startTime := tn 174 mu := &sync.Mutex{} 175 now = func() time.Time { 176 mu.Lock() 177 defer mu.Unlock() 178 return tn 179 } 180 181 // Use a fairly high latency to cause a large BDP and avoid sleeps while 182 // writing due to simulation of full buffers. 183 n := &Network{Kbps: 2, Latency: 1 * time.Second, MTU: 10} 184 // 2 kbps = .25 kBps = 256 Bps 185 byteLatency := func(n int) time.Duration { 186 return time.Duration(n) * time.Second / 256 187 } 188 189 // Create a real listener and wrap it. 190 l, err := net.Listen("tcp", "localhost:0") 191 if err != nil { 192 t.Fatalf("Unexpected error creating listener: %v", err) 193 } 194 defer l.Close() 195 l = n.Listener(l) 196 197 var serverConn net.Conn 198 var scErr error 199 scDone := make(chan struct{}) 200 go func() { 201 serverConn, scErr = l.Accept() 202 close(scDone) 203 }() 204 205 // Create a dialer and use it. 206 clientConn, err := n.TimeoutDialer(net.DialTimeout)("tcp", l.Addr().String(), 2*time.Second) 207 if err != nil { 208 t.Fatalf("Unexpected error dialing: %v", err) 209 } 210 defer clientConn.Close() 211 212 // Block until server's Conn is available. 213 <-scDone 214 if scErr != nil { 215 t.Fatalf("Unexpected error listening: %v", scErr) 216 } 217 defer serverConn.Close() 218 219 // sleep (only) advances tn. Done after connections established so sync detects zero delay. 220 sleep = func(d time.Duration) { 221 mu.Lock() 222 defer mu.Unlock() 223 if d > 0 { 224 tn = tn.Add(d) 225 } 226 } 227 228 seq := func(a, b int) []byte { 229 buf := make([]byte, b-a) 230 for i := 0; i < b-a; i++ { 231 buf[i] = byte(i + a) 232 } 233 return buf 234 } 235 236 pkt1 := seq(0, 10) 237 pkt2 := seq(10, 30) 238 pkt3 := seq(30, 35) 239 240 write := func(c net.Conn, b []byte) { 241 n, err := c.Write(b) 242 if n != len(b) || err != nil { 243 t.Fatalf("c.Write(%v) = %v, %v; want %v, nil", b, n, err, len(b)) 244 } 245 } 246 247 write(serverConn, pkt1) 248 write(serverConn, pkt2) 249 write(serverConn, pkt3) 250 write(clientConn, pkt3) 251 write(clientConn, pkt1) 252 write(clientConn, pkt2) 253 254 if tn != startTime { 255 t.Fatalf("unexpected sleep in write; tn = %v; want %v", tn, startTime) 256 } 257 258 read := func(c net.Conn, n int, want []byte, timeWant time.Time) { 259 b := make([]byte, n) 260 if rd, err := c.Read(b); err != nil || rd != len(want) { 261 t.Fatalf("c.Read(<%v bytes>) = %v, %v; want %v, nil (read: %v)", n, rd, err, len(want), b[:rd]) 262 } 263 if !reflect.DeepEqual(b[:len(want)], want) { 264 t.Fatalf("read %v; want %v", b, want) 265 } 266 if !tn.Equal(timeWant) { 267 t.Errorf("tn after read(%v) = %v; want %v", want, tn, timeWant) 268 } 269 } 270 271 read(clientConn, len(pkt1)+1, pkt1, startTime.Add(n.Latency+byteLatency(len(pkt1)))) 272 read(serverConn, len(pkt3)+1, pkt3, tn) // tn was advanced by the above read; pkt3 is shorter than pkt1 273 274 read(clientConn, len(pkt2), pkt2[:10], startTime.Add(n.Latency+byteLatency(len(pkt1)+10))) 275 read(clientConn, len(pkt2), pkt2[10:], startTime.Add(n.Latency+byteLatency(len(pkt1)+len(pkt2)))) 276 read(clientConn, len(pkt3), pkt3, startTime.Add(n.Latency+byteLatency(len(pkt1)+len(pkt2)+len(pkt3)))) 277 278 read(serverConn, len(pkt1), pkt1, tn) // tn already past the arrival time due to prior reads 279 read(serverConn, len(pkt2), pkt2[:10], tn) 280 read(serverConn, len(pkt2), pkt2[10:], tn) 281 282 // Sleep awhile and make sure the read happens disregarding previous writes 283 // (lastSendEnd handling). 284 sleep(10 * time.Second) 285 write(clientConn, pkt1) 286 read(serverConn, len(pkt1), pkt1, tn.Add(n.Latency+byteLatency(len(pkt1)))) 287 288 // Send, sleep longer than the network delay, then make sure the read happens 289 // instantly. 290 write(serverConn, pkt1) 291 sleep(10 * time.Second) 292 read(clientConn, len(pkt1), pkt1, tn) 293} 294 295func TestBufferBloat(t *testing.T) { 296 defer restoreHooks()() 297 298 // Infinitely fast CPU: time doesn't pass unless sleep is called. 299 tn := time.Unix(123, 0) 300 now = func() time.Time { return tn } 301 // Capture sleep times for checking later. 302 var sleepTimes []time.Duration 303 sleep = func(d time.Duration) { 304 sleepTimes = append(sleepTimes, d) 305 tn = tn.Add(d) 306 } 307 308 wantSleeps := func(want ...time.Duration) error { 309 if !reflect.DeepEqual(want, sleepTimes) { 310 return fmt.Errorf("sleepTimes = %v; want %v", sleepTimes, want) 311 } 312 sleepTimes = nil 313 return nil 314 } 315 316 n := &Network{Kbps: 8 /* 1KBps */, Latency: time.Second, MTU: 8} 317 bdpBytes := (n.Kbps * 1024 / 8) * int(n.Latency/time.Second) // 1024 318 c, err := n.Conn(bufConn{&bytes.Buffer{}}) 319 if err != nil { 320 t.Fatalf("Unexpected error creating connection: %v", err) 321 } 322 wantSleeps(n.Latency) // Connection creation delay. 323 324 write := func(n int, sleeps ...time.Duration) { 325 if wt, err := c.Write(make([]byte, n)); err != nil || wt != n { 326 t.Fatalf("c.Write(<%v bytes>) = %v, %v; want %v, nil", n, wt, err, n) 327 } 328 if err := wantSleeps(sleeps...); err != nil { 329 t.Fatalf("After writing %v bytes: %v", n, err) 330 } 331 } 332 333 read := func(n int, sleeps ...time.Duration) { 334 if rd, err := c.Read(make([]byte, n)); err != nil || rd != n { 335 t.Fatalf("c.Read(_) = %v, %v; want %v, nil", rd, err, n) 336 } 337 if err := wantSleeps(sleeps...); err != nil { 338 t.Fatalf("After reading %v bytes: %v", n, err) 339 } 340 } 341 342 write(8) // No reads and buffer not full, so no sleeps yet. 343 read(8, time.Second+n.pktTime(8)) 344 345 write(bdpBytes) // Fill the buffer. 346 write(1) // We can send one extra packet even when the buffer is full. 347 write(n.MTU, n.pktTime(1)) // Make sure we sleep to clear the previous write. 348 write(1, n.pktTime(n.MTU)) 349 write(n.MTU+1, n.pktTime(1), n.pktTime(n.MTU)) 350 351 tn = tn.Add(10 * time.Second) // Wait long enough for the buffer to clear. 352 write(bdpBytes) // No sleeps required. 353} 354