1/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// Package latency provides wrappers for net.Conn, net.Listener, and
20// net.Dialers, designed to interoperate to inject real-world latency into
21// network connections.
22package latency
23
24import (
25	"bytes"
26	"encoding/binary"
27	"fmt"
28	"io"
29	"net"
30	"time"
31
32	"golang.org/x/net/context"
33)
34
35// Dialer is a function matching the signature of net.Dial.
36type Dialer func(network, address string) (net.Conn, error)
37
38// TimeoutDialer is a function matching the signature of net.DialTimeout.
39type TimeoutDialer func(network, address string, timeout time.Duration) (net.Conn, error)
40
41// ContextDialer is a function matching the signature of
42// net.Dialer.DialContext.
43type ContextDialer func(ctx context.Context, network, address string) (net.Conn, error)
44
45// Network represents a network with the given bandwidth, latency, and MTU
46// (Maximum Transmission Unit) configuration, and can produce wrappers of
47// net.Listeners, net.Conn, and various forms of dialing functions.  The
48// Listeners and Dialers/Conns on both sides of connections must come from this
49// package, but need not be created from the same Network.  Latency is computed
50// when sending (in Write), and is injected when receiving (in Read).  This
51// allows senders' Write calls to be non-blocking, as in real-world
52// applications.
53//
54// Note: Latency is injected by the sender specifying the absolute time data
55// should be available, and the reader delaying until that time arrives to
56// provide the data.  This package attempts to counter-act the effects of clock
57// drift and existing network latency by measuring the delay between the
58// sender's transmission time and the receiver's reception time during startup.
59// No attempt is made to measure the existing bandwidth of the connection.
60type Network struct {
61	Kbps    int           // Kilobits per second; if non-positive, infinite
62	Latency time.Duration // One-way latency (sending); if non-positive, no delay
63	MTU     int           // Bytes per packet; if non-positive, infinite
64}
65
66var (
67	//Local simulates local network.
68	Local = Network{0, 0, 0}
69	//LAN simulates local area network network.
70	LAN = Network{100 * 1024, 2 * time.Millisecond, 1500}
71	//WAN simulates wide area network.
72	WAN = Network{20 * 1024, 30 * time.Millisecond, 1500}
73	//Longhaul simulates bad network.
74	Longhaul = Network{1000 * 1024, 200 * time.Millisecond, 9000}
75)
76
77// Conn returns a net.Conn that wraps c and injects n's latency into that
78// connection.  This function also imposes latency for connection creation.
79// If n's Latency is lower than the measured latency in c, an error is
80// returned.
81func (n *Network) Conn(c net.Conn) (net.Conn, error) {
82	start := now()
83	nc := &conn{Conn: c, network: n, readBuf: new(bytes.Buffer)}
84	if err := nc.sync(); err != nil {
85		return nil, err
86	}
87	sleep(start.Add(nc.delay).Sub(now()))
88	return nc, nil
89}
90
91type conn struct {
92	net.Conn
93	network *Network
94
95	readBuf     *bytes.Buffer // one packet worth of data received
96	lastSendEnd time.Time     // time the previous Write should be fully on the wire
97	delay       time.Duration // desired latency - measured latency
98}
99
100// header is sent before all data transmitted by the application.
101type header struct {
102	ReadTime int64 // Time the reader is allowed to read this packet (UnixNano)
103	Sz       int32 // Size of the data in the packet
104}
105
106func (c *conn) Write(p []byte) (n int, err error) {
107	tNow := now()
108	if c.lastSendEnd.Before(tNow) {
109		c.lastSendEnd = tNow
110	}
111	for len(p) > 0 {
112		pkt := p
113		if c.network.MTU > 0 && len(pkt) > c.network.MTU {
114			pkt = pkt[:c.network.MTU]
115			p = p[c.network.MTU:]
116		} else {
117			p = nil
118		}
119		if c.network.Kbps > 0 {
120			if congestion := c.lastSendEnd.Sub(tNow) - c.delay; congestion > 0 {
121				// The network is full; sleep until this packet can be sent.
122				sleep(congestion)
123				tNow = tNow.Add(congestion)
124			}
125		}
126		c.lastSendEnd = c.lastSendEnd.Add(c.network.pktTime(len(pkt)))
127		hdr := header{ReadTime: c.lastSendEnd.Add(c.delay).UnixNano(), Sz: int32(len(pkt))}
128		if err := binary.Write(c.Conn, binary.BigEndian, hdr); err != nil {
129			return n, err
130		}
131		x, err := c.Conn.Write(pkt)
132		n += x
133		if err != nil {
134			return n, err
135		}
136	}
137	return n, nil
138}
139
140func (c *conn) Read(p []byte) (n int, err error) {
141	if c.readBuf.Len() == 0 {
142		var hdr header
143		if err := binary.Read(c.Conn, binary.BigEndian, &hdr); err != nil {
144			return 0, err
145		}
146		defer func() { sleep(time.Unix(0, hdr.ReadTime).Sub(now())) }()
147
148		if _, err := io.CopyN(c.readBuf, c.Conn, int64(hdr.Sz)); err != nil {
149			return 0, err
150		}
151	}
152	// Read from readBuf.
153	return c.readBuf.Read(p)
154}
155
156// sync does a handshake and then measures the latency on the network in
157// coordination with the other side.
158func (c *conn) sync() error {
159	const (
160		pingMsg  = "syncPing"
161		warmup   = 10               // minimum number of iterations to measure latency
162		giveUp   = 50               // maximum number of iterations to measure latency
163		accuracy = time.Millisecond // req'd accuracy to stop early
164		goodRun  = 3                // stop early if latency within accuracy this many times
165	)
166
167	type syncMsg struct {
168		SendT int64 // Time sent.  If zero, stop.
169		RecvT int64 // Time received.  If zero, fill in and respond.
170	}
171
172	// A trivial handshake
173	if err := binary.Write(c.Conn, binary.BigEndian, []byte(pingMsg)); err != nil {
174		return err
175	}
176	var ping [8]byte
177	if err := binary.Read(c.Conn, binary.BigEndian, &ping); err != nil {
178		return err
179	} else if string(ping[:]) != pingMsg {
180		return fmt.Errorf("malformed handshake message: %v (want %q)", ping, pingMsg)
181	}
182
183	// Both sides are alive and syncing.  Calculate network delay / clock skew.
184	att := 0
185	good := 0
186	var latency time.Duration
187	localDone, remoteDone := false, false
188	send := true
189	for !localDone || !remoteDone {
190		if send {
191			if err := binary.Write(c.Conn, binary.BigEndian, syncMsg{SendT: now().UnixNano()}); err != nil {
192				return err
193			}
194			att++
195			send = false
196		}
197
198		// Block until we get a syncMsg
199		m := syncMsg{}
200		if err := binary.Read(c.Conn, binary.BigEndian, &m); err != nil {
201			return err
202		}
203
204		if m.RecvT == 0 {
205			// Message initiated from other side.
206			if m.SendT == 0 {
207				remoteDone = true
208				continue
209			}
210			// Send response.
211			m.RecvT = now().UnixNano()
212			if err := binary.Write(c.Conn, binary.BigEndian, m); err != nil {
213				return err
214			}
215			continue
216		}
217
218		lag := time.Duration(m.RecvT - m.SendT)
219		latency += lag
220		avgLatency := latency / time.Duration(att)
221		if e := lag - avgLatency; e > -accuracy && e < accuracy {
222			good++
223		} else {
224			good = 0
225		}
226		if att < giveUp && (att < warmup || good < goodRun) {
227			send = true
228			continue
229		}
230		localDone = true
231		latency = avgLatency
232		// Tell the other side we're done.
233		if err := binary.Write(c.Conn, binary.BigEndian, syncMsg{}); err != nil {
234			return err
235		}
236	}
237	if c.network.Latency <= 0 {
238		return nil
239	}
240	c.delay = c.network.Latency - latency
241	if c.delay < 0 {
242		return fmt.Errorf("measured network latency (%v) higher than desired latency (%v)", latency, c.network.Latency)
243	}
244	return nil
245}
246
247// Listener returns a net.Listener that wraps l and injects n's latency in its
248// connections.
249func (n *Network) Listener(l net.Listener) net.Listener {
250	return &listener{Listener: l, network: n}
251}
252
253type listener struct {
254	net.Listener
255	network *Network
256}
257
258func (l *listener) Accept() (net.Conn, error) {
259	c, err := l.Listener.Accept()
260	if err != nil {
261		return nil, err
262	}
263	return l.network.Conn(c)
264}
265
266// Dialer returns a Dialer that wraps d and injects n's latency in its
267// connections.  n's Latency is also injected to the connection's creation.
268func (n *Network) Dialer(d Dialer) Dialer {
269	return func(network, address string) (net.Conn, error) {
270		conn, err := d(network, address)
271		if err != nil {
272			return nil, err
273		}
274		return n.Conn(conn)
275	}
276}
277
278// TimeoutDialer returns a TimeoutDialer that wraps d and injects n's latency
279// in its connections.  n's Latency is also injected to the connection's
280// creation.
281func (n *Network) TimeoutDialer(d TimeoutDialer) TimeoutDialer {
282	return func(network, address string, timeout time.Duration) (net.Conn, error) {
283		conn, err := d(network, address, timeout)
284		if err != nil {
285			return nil, err
286		}
287		return n.Conn(conn)
288	}
289}
290
291// ContextDialer returns a ContextDialer that wraps d and injects n's latency
292// in its connections.  n's Latency is also injected to the connection's
293// creation.
294func (n *Network) ContextDialer(d ContextDialer) ContextDialer {
295	return func(ctx context.Context, network, address string) (net.Conn, error) {
296		conn, err := d(ctx, network, address)
297		if err != nil {
298			return nil, err
299		}
300		return n.Conn(conn)
301	}
302}
303
304// pktTime returns the time it takes to transmit one packet of data of size b
305// in bytes.
306func (n *Network) pktTime(b int) time.Duration {
307	if n.Kbps <= 0 {
308		return time.Duration(0)
309	}
310	return time.Duration(b) * time.Second / time.Duration(n.Kbps*(1024/8))
311}
312
313// Wrappers for testing
314
315var now = time.Now
316var sleep = time.Sleep
317