1/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// Package latency provides wrappers for net.Conn, net.Listener, and
20// net.Dialers, designed to interoperate to inject real-world latency into
21// network connections.
22package latency
23
24import (
25	"bytes"
26	"context"
27	"encoding/binary"
28	"fmt"
29	"io"
30	"net"
31	"time"
32)
33
34// Dialer is a function matching the signature of net.Dial.
35type Dialer func(network, address string) (net.Conn, error)
36
37// TimeoutDialer is a function matching the signature of net.DialTimeout.
38type TimeoutDialer func(network, address string, timeout time.Duration) (net.Conn, error)
39
40// ContextDialer is a function matching the signature of
41// net.Dialer.DialContext.
42type ContextDialer func(ctx context.Context, network, address string) (net.Conn, error)
43
44// Network represents a network with the given bandwidth, latency, and MTU
45// (Maximum Transmission Unit) configuration, and can produce wrappers of
46// net.Listeners, net.Conn, and various forms of dialing functions.  The
47// Listeners and Dialers/Conns on both sides of connections must come from this
48// package, but need not be created from the same Network.  Latency is computed
49// when sending (in Write), and is injected when receiving (in Read).  This
50// allows senders' Write calls to be non-blocking, as in real-world
51// applications.
52//
53// Note: Latency is injected by the sender specifying the absolute time data
54// should be available, and the reader delaying until that time arrives to
55// provide the data.  This package attempts to counter-act the effects of clock
56// drift and existing network latency by measuring the delay between the
57// sender's transmission time and the receiver's reception time during startup.
58// No attempt is made to measure the existing bandwidth of the connection.
59type Network struct {
60	Kbps    int           // Kilobits per second; if non-positive, infinite
61	Latency time.Duration // One-way latency (sending); if non-positive, no delay
62	MTU     int           // Bytes per packet; if non-positive, infinite
63}
64
65var (
66	//Local simulates local network.
67	Local = Network{0, 0, 0}
68	//LAN simulates local area network network.
69	LAN = Network{100 * 1024, 2 * time.Millisecond, 1500}
70	//WAN simulates wide area network.
71	WAN = Network{20 * 1024, 30 * time.Millisecond, 1500}
72	//Longhaul simulates bad network.
73	Longhaul = Network{1000 * 1024, 200 * time.Millisecond, 9000}
74)
75
76// Conn returns a net.Conn that wraps c and injects n's latency into that
77// connection.  This function also imposes latency for connection creation.
78// If n's Latency is lower than the measured latency in c, an error is
79// returned.
80func (n *Network) Conn(c net.Conn) (net.Conn, error) {
81	start := now()
82	nc := &conn{Conn: c, network: n, readBuf: new(bytes.Buffer)}
83	if err := nc.sync(); err != nil {
84		return nil, err
85	}
86	sleep(start.Add(nc.delay).Sub(now()))
87	return nc, nil
88}
89
90type conn struct {
91	net.Conn
92	network *Network
93
94	readBuf     *bytes.Buffer // one packet worth of data received
95	lastSendEnd time.Time     // time the previous Write should be fully on the wire
96	delay       time.Duration // desired latency - measured latency
97}
98
99// header is sent before all data transmitted by the application.
100type header struct {
101	ReadTime int64 // Time the reader is allowed to read this packet (UnixNano)
102	Sz       int32 // Size of the data in the packet
103}
104
105func (c *conn) Write(p []byte) (n int, err error) {
106	tNow := now()
107	if c.lastSendEnd.Before(tNow) {
108		c.lastSendEnd = tNow
109	}
110	for len(p) > 0 {
111		pkt := p
112		if c.network.MTU > 0 && len(pkt) > c.network.MTU {
113			pkt = pkt[:c.network.MTU]
114			p = p[c.network.MTU:]
115		} else {
116			p = nil
117		}
118		if c.network.Kbps > 0 {
119			if congestion := c.lastSendEnd.Sub(tNow) - c.delay; congestion > 0 {
120				// The network is full; sleep until this packet can be sent.
121				sleep(congestion)
122				tNow = tNow.Add(congestion)
123			}
124		}
125		c.lastSendEnd = c.lastSendEnd.Add(c.network.pktTime(len(pkt)))
126		hdr := header{ReadTime: c.lastSendEnd.Add(c.delay).UnixNano(), Sz: int32(len(pkt))}
127		if err := binary.Write(c.Conn, binary.BigEndian, hdr); err != nil {
128			return n, err
129		}
130		x, err := c.Conn.Write(pkt)
131		n += x
132		if err != nil {
133			return n, err
134		}
135	}
136	return n, nil
137}
138
139func (c *conn) Read(p []byte) (n int, err error) {
140	if c.readBuf.Len() == 0 {
141		var hdr header
142		if err := binary.Read(c.Conn, binary.BigEndian, &hdr); err != nil {
143			return 0, err
144		}
145		defer func() { sleep(time.Unix(0, hdr.ReadTime).Sub(now())) }()
146
147		if _, err := io.CopyN(c.readBuf, c.Conn, int64(hdr.Sz)); err != nil {
148			return 0, err
149		}
150	}
151	// Read from readBuf.
152	return c.readBuf.Read(p)
153}
154
155// sync does a handshake and then measures the latency on the network in
156// coordination with the other side.
157func (c *conn) sync() error {
158	const (
159		pingMsg  = "syncPing"
160		warmup   = 10               // minimum number of iterations to measure latency
161		giveUp   = 50               // maximum number of iterations to measure latency
162		accuracy = time.Millisecond // req'd accuracy to stop early
163		goodRun  = 3                // stop early if latency within accuracy this many times
164	)
165
166	type syncMsg struct {
167		SendT int64 // Time sent.  If zero, stop.
168		RecvT int64 // Time received.  If zero, fill in and respond.
169	}
170
171	// A trivial handshake
172	if err := binary.Write(c.Conn, binary.BigEndian, []byte(pingMsg)); err != nil {
173		return err
174	}
175	var ping [8]byte
176	if err := binary.Read(c.Conn, binary.BigEndian, &ping); err != nil {
177		return err
178	} else if string(ping[:]) != pingMsg {
179		return fmt.Errorf("malformed handshake message: %v (want %q)", ping, pingMsg)
180	}
181
182	// Both sides are alive and syncing.  Calculate network delay / clock skew.
183	att := 0
184	good := 0
185	var latency time.Duration
186	localDone, remoteDone := false, false
187	send := true
188	for !localDone || !remoteDone {
189		if send {
190			if err := binary.Write(c.Conn, binary.BigEndian, syncMsg{SendT: now().UnixNano()}); err != nil {
191				return err
192			}
193			att++
194			send = false
195		}
196
197		// Block until we get a syncMsg
198		m := syncMsg{}
199		if err := binary.Read(c.Conn, binary.BigEndian, &m); err != nil {
200			return err
201		}
202
203		if m.RecvT == 0 {
204			// Message initiated from other side.
205			if m.SendT == 0 {
206				remoteDone = true
207				continue
208			}
209			// Send response.
210			m.RecvT = now().UnixNano()
211			if err := binary.Write(c.Conn, binary.BigEndian, m); err != nil {
212				return err
213			}
214			continue
215		}
216
217		lag := time.Duration(m.RecvT - m.SendT)
218		latency += lag
219		avgLatency := latency / time.Duration(att)
220		if e := lag - avgLatency; e > -accuracy && e < accuracy {
221			good++
222		} else {
223			good = 0
224		}
225		if att < giveUp && (att < warmup || good < goodRun) {
226			send = true
227			continue
228		}
229		localDone = true
230		latency = avgLatency
231		// Tell the other side we're done.
232		if err := binary.Write(c.Conn, binary.BigEndian, syncMsg{}); err != nil {
233			return err
234		}
235	}
236	if c.network.Latency <= 0 {
237		return nil
238	}
239	c.delay = c.network.Latency - latency
240	if c.delay < 0 {
241		return fmt.Errorf("measured network latency (%v) higher than desired latency (%v)", latency, c.network.Latency)
242	}
243	return nil
244}
245
246// Listener returns a net.Listener that wraps l and injects n's latency in its
247// connections.
248func (n *Network) Listener(l net.Listener) net.Listener {
249	return &listener{Listener: l, network: n}
250}
251
252type listener struct {
253	net.Listener
254	network *Network
255}
256
257func (l *listener) Accept() (net.Conn, error) {
258	c, err := l.Listener.Accept()
259	if err != nil {
260		return nil, err
261	}
262	return l.network.Conn(c)
263}
264
265// Dialer returns a Dialer that wraps d and injects n's latency in its
266// connections.  n's Latency is also injected to the connection's creation.
267func (n *Network) Dialer(d Dialer) Dialer {
268	return func(network, address string) (net.Conn, error) {
269		conn, err := d(network, address)
270		if err != nil {
271			return nil, err
272		}
273		return n.Conn(conn)
274	}
275}
276
277// TimeoutDialer returns a TimeoutDialer that wraps d and injects n's latency
278// in its connections.  n's Latency is also injected to the connection's
279// creation.
280func (n *Network) TimeoutDialer(d TimeoutDialer) TimeoutDialer {
281	return func(network, address string, timeout time.Duration) (net.Conn, error) {
282		conn, err := d(network, address, timeout)
283		if err != nil {
284			return nil, err
285		}
286		return n.Conn(conn)
287	}
288}
289
290// ContextDialer returns a ContextDialer that wraps d and injects n's latency
291// in its connections.  n's Latency is also injected to the connection's
292// creation.
293func (n *Network) ContextDialer(d ContextDialer) ContextDialer {
294	return func(ctx context.Context, network, address string) (net.Conn, error) {
295		conn, err := d(ctx, network, address)
296		if err != nil {
297			return nil, err
298		}
299		return n.Conn(conn)
300	}
301}
302
303// pktTime returns the time it takes to transmit one packet of data of size b
304// in bytes.
305func (n *Network) pktTime(b int) time.Duration {
306	if n.Kbps <= 0 {
307		return time.Duration(0)
308	}
309	return time.Duration(b) * time.Second / time.Duration(n.Kbps*(1024/8))
310}
311
312// Wrappers for testing
313
314var now = time.Now
315var sleep = time.Sleep
316