1/* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19// Package latency provides wrappers for net.Conn, net.Listener, and 20// net.Dialers, designed to interoperate to inject real-world latency into 21// network connections. 22package latency 23 24import ( 25 "bytes" 26 "context" 27 "encoding/binary" 28 "fmt" 29 "io" 30 "net" 31 "time" 32) 33 34// Dialer is a function matching the signature of net.Dial. 35type Dialer func(network, address string) (net.Conn, error) 36 37// TimeoutDialer is a function matching the signature of net.DialTimeout. 38type TimeoutDialer func(network, address string, timeout time.Duration) (net.Conn, error) 39 40// ContextDialer is a function matching the signature of 41// net.Dialer.DialContext. 42type ContextDialer func(ctx context.Context, network, address string) (net.Conn, error) 43 44// Network represents a network with the given bandwidth, latency, and MTU 45// (Maximum Transmission Unit) configuration, and can produce wrappers of 46// net.Listeners, net.Conn, and various forms of dialing functions. The 47// Listeners and Dialers/Conns on both sides of connections must come from this 48// package, but need not be created from the same Network. Latency is computed 49// when sending (in Write), and is injected when receiving (in Read). This 50// allows senders' Write calls to be non-blocking, as in real-world 51// applications. 52// 53// Note: Latency is injected by the sender specifying the absolute time data 54// should be available, and the reader delaying until that time arrives to 55// provide the data. This package attempts to counter-act the effects of clock 56// drift and existing network latency by measuring the delay between the 57// sender's transmission time and the receiver's reception time during startup. 58// No attempt is made to measure the existing bandwidth of the connection. 59type Network struct { 60 Kbps int // Kilobits per second; if non-positive, infinite 61 Latency time.Duration // One-way latency (sending); if non-positive, no delay 62 MTU int // Bytes per packet; if non-positive, infinite 63} 64 65var ( 66 //Local simulates local network. 67 Local = Network{0, 0, 0} 68 //LAN simulates local area network network. 69 LAN = Network{100 * 1024, 2 * time.Millisecond, 1500} 70 //WAN simulates wide area network. 71 WAN = Network{20 * 1024, 30 * time.Millisecond, 1500} 72 //Longhaul simulates bad network. 73 Longhaul = Network{1000 * 1024, 200 * time.Millisecond, 9000} 74) 75 76// Conn returns a net.Conn that wraps c and injects n's latency into that 77// connection. This function also imposes latency for connection creation. 78// If n's Latency is lower than the measured latency in c, an error is 79// returned. 80func (n *Network) Conn(c net.Conn) (net.Conn, error) { 81 start := now() 82 nc := &conn{Conn: c, network: n, readBuf: new(bytes.Buffer)} 83 if err := nc.sync(); err != nil { 84 return nil, err 85 } 86 sleep(start.Add(nc.delay).Sub(now())) 87 return nc, nil 88} 89 90type conn struct { 91 net.Conn 92 network *Network 93 94 readBuf *bytes.Buffer // one packet worth of data received 95 lastSendEnd time.Time // time the previous Write should be fully on the wire 96 delay time.Duration // desired latency - measured latency 97} 98 99// header is sent before all data transmitted by the application. 100type header struct { 101 ReadTime int64 // Time the reader is allowed to read this packet (UnixNano) 102 Sz int32 // Size of the data in the packet 103} 104 105func (c *conn) Write(p []byte) (n int, err error) { 106 tNow := now() 107 if c.lastSendEnd.Before(tNow) { 108 c.lastSendEnd = tNow 109 } 110 for len(p) > 0 { 111 pkt := p 112 if c.network.MTU > 0 && len(pkt) > c.network.MTU { 113 pkt = pkt[:c.network.MTU] 114 p = p[c.network.MTU:] 115 } else { 116 p = nil 117 } 118 if c.network.Kbps > 0 { 119 if congestion := c.lastSendEnd.Sub(tNow) - c.delay; congestion > 0 { 120 // The network is full; sleep until this packet can be sent. 121 sleep(congestion) 122 tNow = tNow.Add(congestion) 123 } 124 } 125 c.lastSendEnd = c.lastSendEnd.Add(c.network.pktTime(len(pkt))) 126 hdr := header{ReadTime: c.lastSendEnd.Add(c.delay).UnixNano(), Sz: int32(len(pkt))} 127 if err := binary.Write(c.Conn, binary.BigEndian, hdr); err != nil { 128 return n, err 129 } 130 x, err := c.Conn.Write(pkt) 131 n += x 132 if err != nil { 133 return n, err 134 } 135 } 136 return n, nil 137} 138 139func (c *conn) Read(p []byte) (n int, err error) { 140 if c.readBuf.Len() == 0 { 141 var hdr header 142 if err := binary.Read(c.Conn, binary.BigEndian, &hdr); err != nil { 143 return 0, err 144 } 145 defer func() { sleep(time.Unix(0, hdr.ReadTime).Sub(now())) }() 146 147 if _, err := io.CopyN(c.readBuf, c.Conn, int64(hdr.Sz)); err != nil { 148 return 0, err 149 } 150 } 151 // Read from readBuf. 152 return c.readBuf.Read(p) 153} 154 155// sync does a handshake and then measures the latency on the network in 156// coordination with the other side. 157func (c *conn) sync() error { 158 const ( 159 pingMsg = "syncPing" 160 warmup = 10 // minimum number of iterations to measure latency 161 giveUp = 50 // maximum number of iterations to measure latency 162 accuracy = time.Millisecond // req'd accuracy to stop early 163 goodRun = 3 // stop early if latency within accuracy this many times 164 ) 165 166 type syncMsg struct { 167 SendT int64 // Time sent. If zero, stop. 168 RecvT int64 // Time received. If zero, fill in and respond. 169 } 170 171 // A trivial handshake 172 if err := binary.Write(c.Conn, binary.BigEndian, []byte(pingMsg)); err != nil { 173 return err 174 } 175 var ping [8]byte 176 if err := binary.Read(c.Conn, binary.BigEndian, &ping); err != nil { 177 return err 178 } else if string(ping[:]) != pingMsg { 179 return fmt.Errorf("malformed handshake message: %v (want %q)", ping, pingMsg) 180 } 181 182 // Both sides are alive and syncing. Calculate network delay / clock skew. 183 att := 0 184 good := 0 185 var latency time.Duration 186 localDone, remoteDone := false, false 187 send := true 188 for !localDone || !remoteDone { 189 if send { 190 if err := binary.Write(c.Conn, binary.BigEndian, syncMsg{SendT: now().UnixNano()}); err != nil { 191 return err 192 } 193 att++ 194 send = false 195 } 196 197 // Block until we get a syncMsg 198 m := syncMsg{} 199 if err := binary.Read(c.Conn, binary.BigEndian, &m); err != nil { 200 return err 201 } 202 203 if m.RecvT == 0 { 204 // Message initiated from other side. 205 if m.SendT == 0 { 206 remoteDone = true 207 continue 208 } 209 // Send response. 210 m.RecvT = now().UnixNano() 211 if err := binary.Write(c.Conn, binary.BigEndian, m); err != nil { 212 return err 213 } 214 continue 215 } 216 217 lag := time.Duration(m.RecvT - m.SendT) 218 latency += lag 219 avgLatency := latency / time.Duration(att) 220 if e := lag - avgLatency; e > -accuracy && e < accuracy { 221 good++ 222 } else { 223 good = 0 224 } 225 if att < giveUp && (att < warmup || good < goodRun) { 226 send = true 227 continue 228 } 229 localDone = true 230 latency = avgLatency 231 // Tell the other side we're done. 232 if err := binary.Write(c.Conn, binary.BigEndian, syncMsg{}); err != nil { 233 return err 234 } 235 } 236 if c.network.Latency <= 0 { 237 return nil 238 } 239 c.delay = c.network.Latency - latency 240 if c.delay < 0 { 241 return fmt.Errorf("measured network latency (%v) higher than desired latency (%v)", latency, c.network.Latency) 242 } 243 return nil 244} 245 246// Listener returns a net.Listener that wraps l and injects n's latency in its 247// connections. 248func (n *Network) Listener(l net.Listener) net.Listener { 249 return &listener{Listener: l, network: n} 250} 251 252type listener struct { 253 net.Listener 254 network *Network 255} 256 257func (l *listener) Accept() (net.Conn, error) { 258 c, err := l.Listener.Accept() 259 if err != nil { 260 return nil, err 261 } 262 return l.network.Conn(c) 263} 264 265// Dialer returns a Dialer that wraps d and injects n's latency in its 266// connections. n's Latency is also injected to the connection's creation. 267func (n *Network) Dialer(d Dialer) Dialer { 268 return func(network, address string) (net.Conn, error) { 269 conn, err := d(network, address) 270 if err != nil { 271 return nil, err 272 } 273 return n.Conn(conn) 274 } 275} 276 277// TimeoutDialer returns a TimeoutDialer that wraps d and injects n's latency 278// in its connections. n's Latency is also injected to the connection's 279// creation. 280func (n *Network) TimeoutDialer(d TimeoutDialer) TimeoutDialer { 281 return func(network, address string, timeout time.Duration) (net.Conn, error) { 282 conn, err := d(network, address, timeout) 283 if err != nil { 284 return nil, err 285 } 286 return n.Conn(conn) 287 } 288} 289 290// ContextDialer returns a ContextDialer that wraps d and injects n's latency 291// in its connections. n's Latency is also injected to the connection's 292// creation. 293func (n *Network) ContextDialer(d ContextDialer) ContextDialer { 294 return func(ctx context.Context, network, address string) (net.Conn, error) { 295 conn, err := d(ctx, network, address) 296 if err != nil { 297 return nil, err 298 } 299 return n.Conn(conn) 300 } 301} 302 303// pktTime returns the time it takes to transmit one packet of data of size b 304// in bytes. 305func (n *Network) pktTime(b int) time.Duration { 306 if n.Kbps <= 0 { 307 return time.Duration(0) 308 } 309 return time.Duration(b) * time.Second / time.Duration(n.Kbps*(1024/8)) 310} 311 312// Wrappers for testing 313 314var now = time.Now 315var sleep = time.Sleep 316