1/* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19// Package latency provides wrappers for net.Conn, net.Listener, and 20// net.Dialers, designed to interoperate to inject real-world latency into 21// network connections. 22package latency 23 24import ( 25 "bytes" 26 "encoding/binary" 27 "fmt" 28 "io" 29 "net" 30 "time" 31 32 "golang.org/x/net/context" 33) 34 35// Dialer is a function matching the signature of net.Dial. 36type Dialer func(network, address string) (net.Conn, error) 37 38// TimeoutDialer is a function matching the signature of net.DialTimeout. 39type TimeoutDialer func(network, address string, timeout time.Duration) (net.Conn, error) 40 41// ContextDialer is a function matching the signature of 42// net.Dialer.DialContext. 43type ContextDialer func(ctx context.Context, network, address string) (net.Conn, error) 44 45// Network represents a network with the given bandwidth, latency, and MTU 46// (Maximum Transmission Unit) configuration, and can produce wrappers of 47// net.Listeners, net.Conn, and various forms of dialing functions. The 48// Listeners and Dialers/Conns on both sides of connections must come from this 49// package, but need not be created from the same Network. Latency is computed 50// when sending (in Write), and is injected when receiving (in Read). This 51// allows senders' Write calls to be non-blocking, as in real-world 52// applications. 53// 54// Note: Latency is injected by the sender specifying the absolute time data 55// should be available, and the reader delaying until that time arrives to 56// provide the data. This package attempts to counter-act the effects of clock 57// drift and existing network latency by measuring the delay between the 58// sender's transmission time and the receiver's reception time during startup. 59// No attempt is made to measure the existing bandwidth of the connection. 60type Network struct { 61 Kbps int // Kilobits per second; if non-positive, infinite 62 Latency time.Duration // One-way latency (sending); if non-positive, no delay 63 MTU int // Bytes per packet; if non-positive, infinite 64} 65 66var ( 67 //Local simulates local network. 68 Local = Network{0, 0, 0} 69 //LAN simulates local area network network. 70 LAN = Network{100 * 1024, 2 * time.Millisecond, 1500} 71 //WAN simulates wide area network. 72 WAN = Network{20 * 1024, 30 * time.Millisecond, 1500} 73 //Longhaul simulates bad network. 74 Longhaul = Network{1000 * 1024, 200 * time.Millisecond, 9000} 75) 76 77// Conn returns a net.Conn that wraps c and injects n's latency into that 78// connection. This function also imposes latency for connection creation. 79// If n's Latency is lower than the measured latency in c, an error is 80// returned. 81func (n *Network) Conn(c net.Conn) (net.Conn, error) { 82 start := now() 83 nc := &conn{Conn: c, network: n, readBuf: new(bytes.Buffer)} 84 if err := nc.sync(); err != nil { 85 return nil, err 86 } 87 sleep(start.Add(nc.delay).Sub(now())) 88 return nc, nil 89} 90 91type conn struct { 92 net.Conn 93 network *Network 94 95 readBuf *bytes.Buffer // one packet worth of data received 96 lastSendEnd time.Time // time the previous Write should be fully on the wire 97 delay time.Duration // desired latency - measured latency 98} 99 100// header is sent before all data transmitted by the application. 101type header struct { 102 ReadTime int64 // Time the reader is allowed to read this packet (UnixNano) 103 Sz int32 // Size of the data in the packet 104} 105 106func (c *conn) Write(p []byte) (n int, err error) { 107 tNow := now() 108 if c.lastSendEnd.Before(tNow) { 109 c.lastSendEnd = tNow 110 } 111 for len(p) > 0 { 112 pkt := p 113 if c.network.MTU > 0 && len(pkt) > c.network.MTU { 114 pkt = pkt[:c.network.MTU] 115 p = p[c.network.MTU:] 116 } else { 117 p = nil 118 } 119 if c.network.Kbps > 0 { 120 if congestion := c.lastSendEnd.Sub(tNow) - c.delay; congestion > 0 { 121 // The network is full; sleep until this packet can be sent. 122 sleep(congestion) 123 tNow = tNow.Add(congestion) 124 } 125 } 126 c.lastSendEnd = c.lastSendEnd.Add(c.network.pktTime(len(pkt))) 127 hdr := header{ReadTime: c.lastSendEnd.Add(c.delay).UnixNano(), Sz: int32(len(pkt))} 128 if err := binary.Write(c.Conn, binary.BigEndian, hdr); err != nil { 129 return n, err 130 } 131 x, err := c.Conn.Write(pkt) 132 n += x 133 if err != nil { 134 return n, err 135 } 136 } 137 return n, nil 138} 139 140func (c *conn) Read(p []byte) (n int, err error) { 141 if c.readBuf.Len() == 0 { 142 var hdr header 143 if err := binary.Read(c.Conn, binary.BigEndian, &hdr); err != nil { 144 return 0, err 145 } 146 defer func() { sleep(time.Unix(0, hdr.ReadTime).Sub(now())) }() 147 148 if _, err := io.CopyN(c.readBuf, c.Conn, int64(hdr.Sz)); err != nil { 149 return 0, err 150 } 151 } 152 // Read from readBuf. 153 return c.readBuf.Read(p) 154} 155 156// sync does a handshake and then measures the latency on the network in 157// coordination with the other side. 158func (c *conn) sync() error { 159 const ( 160 pingMsg = "syncPing" 161 warmup = 10 // minimum number of iterations to measure latency 162 giveUp = 50 // maximum number of iterations to measure latency 163 accuracy = time.Millisecond // req'd accuracy to stop early 164 goodRun = 3 // stop early if latency within accuracy this many times 165 ) 166 167 type syncMsg struct { 168 SendT int64 // Time sent. If zero, stop. 169 RecvT int64 // Time received. If zero, fill in and respond. 170 } 171 172 // A trivial handshake 173 if err := binary.Write(c.Conn, binary.BigEndian, []byte(pingMsg)); err != nil { 174 return err 175 } 176 var ping [8]byte 177 if err := binary.Read(c.Conn, binary.BigEndian, &ping); err != nil { 178 return err 179 } else if string(ping[:]) != pingMsg { 180 return fmt.Errorf("malformed handshake message: %v (want %q)", ping, pingMsg) 181 } 182 183 // Both sides are alive and syncing. Calculate network delay / clock skew. 184 att := 0 185 good := 0 186 var latency time.Duration 187 localDone, remoteDone := false, false 188 send := true 189 for !localDone || !remoteDone { 190 if send { 191 if err := binary.Write(c.Conn, binary.BigEndian, syncMsg{SendT: now().UnixNano()}); err != nil { 192 return err 193 } 194 att++ 195 send = false 196 } 197 198 // Block until we get a syncMsg 199 m := syncMsg{} 200 if err := binary.Read(c.Conn, binary.BigEndian, &m); err != nil { 201 return err 202 } 203 204 if m.RecvT == 0 { 205 // Message initiated from other side. 206 if m.SendT == 0 { 207 remoteDone = true 208 continue 209 } 210 // Send response. 211 m.RecvT = now().UnixNano() 212 if err := binary.Write(c.Conn, binary.BigEndian, m); err != nil { 213 return err 214 } 215 continue 216 } 217 218 lag := time.Duration(m.RecvT - m.SendT) 219 latency += lag 220 avgLatency := latency / time.Duration(att) 221 if e := lag - avgLatency; e > -accuracy && e < accuracy { 222 good++ 223 } else { 224 good = 0 225 } 226 if att < giveUp && (att < warmup || good < goodRun) { 227 send = true 228 continue 229 } 230 localDone = true 231 latency = avgLatency 232 // Tell the other side we're done. 233 if err := binary.Write(c.Conn, binary.BigEndian, syncMsg{}); err != nil { 234 return err 235 } 236 } 237 if c.network.Latency <= 0 { 238 return nil 239 } 240 c.delay = c.network.Latency - latency 241 if c.delay < 0 { 242 return fmt.Errorf("measured network latency (%v) higher than desired latency (%v)", latency, c.network.Latency) 243 } 244 return nil 245} 246 247// Listener returns a net.Listener that wraps l and injects n's latency in its 248// connections. 249func (n *Network) Listener(l net.Listener) net.Listener { 250 return &listener{Listener: l, network: n} 251} 252 253type listener struct { 254 net.Listener 255 network *Network 256} 257 258func (l *listener) Accept() (net.Conn, error) { 259 c, err := l.Listener.Accept() 260 if err != nil { 261 return nil, err 262 } 263 return l.network.Conn(c) 264} 265 266// Dialer returns a Dialer that wraps d and injects n's latency in its 267// connections. n's Latency is also injected to the connection's creation. 268func (n *Network) Dialer(d Dialer) Dialer { 269 return func(network, address string) (net.Conn, error) { 270 conn, err := d(network, address) 271 if err != nil { 272 return nil, err 273 } 274 return n.Conn(conn) 275 } 276} 277 278// TimeoutDialer returns a TimeoutDialer that wraps d and injects n's latency 279// in its connections. n's Latency is also injected to the connection's 280// creation. 281func (n *Network) TimeoutDialer(d TimeoutDialer) TimeoutDialer { 282 return func(network, address string, timeout time.Duration) (net.Conn, error) { 283 conn, err := d(network, address, timeout) 284 if err != nil { 285 return nil, err 286 } 287 return n.Conn(conn) 288 } 289} 290 291// ContextDialer returns a ContextDialer that wraps d and injects n's latency 292// in its connections. n's Latency is also injected to the connection's 293// creation. 294func (n *Network) ContextDialer(d ContextDialer) ContextDialer { 295 return func(ctx context.Context, network, address string) (net.Conn, error) { 296 conn, err := d(ctx, network, address) 297 if err != nil { 298 return nil, err 299 } 300 return n.Conn(conn) 301 } 302} 303 304// pktTime returns the time it takes to transmit one packet of data of size b 305// in bytes. 306func (n *Network) pktTime(b int) time.Duration { 307 if n.Kbps <= 0 { 308 return time.Duration(0) 309 } 310 return time.Duration(b) * time.Second / time.Duration(n.Kbps*(1024/8)) 311} 312 313// Wrappers for testing 314 315var now = time.Now 316var sleep = time.Sleep 317