1// Copyright 2018 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package proxy
16
17import (
18	"fmt"
19	"io"
20	mrand "math/rand"
21	"net"
22	"net/http"
23	"net/url"
24	"strconv"
25	"strings"
26	"sync"
27	"time"
28
29	"go.etcd.io/etcd/pkg/transport"
30
31	humanize "github.com/dustin/go-humanize"
32	"go.uber.org/zap"
33)
34
35var (
36	defaultDialTimeout   = 3 * time.Second
37	defaultBufferSize    = 48 * 1024
38	defaultRetryInterval = 10 * time.Millisecond
39	defaultLogger        *zap.Logger
40)
41
42func init() {
43	var err error
44	defaultLogger, err = zap.NewProduction()
45	if err != nil {
46		panic(err)
47	}
48}
49
50// Server defines proxy server layer that simulates common network faults:
51// latency spikes and packet drop or corruption. The proxy overhead is very
52// small overhead (<500μs per request). Please run tests to compute actual
53// overhead.
54type Server interface {
55	// From returns proxy source address in "scheme://host:port" format.
56	From() string
57	// To returns proxy destination address in "scheme://host:port" format.
58	To() string
59
60	// Ready returns when proxy is ready to serve.
61	Ready() <-chan struct{}
62	// Done returns when proxy has been closed.
63	Done() <-chan struct{}
64	// Error sends errors while serving proxy.
65	Error() <-chan error
66	// Close closes listener and transport.
67	Close() error
68
69	// PauseAccept stops accepting new connections.
70	PauseAccept()
71	// UnpauseAccept removes pause operation on accepting new connections.
72	UnpauseAccept()
73
74	// DelayAccept adds latency ± random variable to accepting
75	// new incoming connections.
76	DelayAccept(latency, rv time.Duration)
77	// UndelayAccept removes sending latencies.
78	UndelayAccept()
79	// LatencyAccept returns current latency on accepting
80	// new incoming connections.
81	LatencyAccept() time.Duration
82
83	// DelayTx adds latency ± random variable for "outgoing" traffic
84	// in "sending" layer.
85	DelayTx(latency, rv time.Duration)
86	// UndelayTx removes sending latencies.
87	UndelayTx()
88	// LatencyTx returns current send latency.
89	LatencyTx() time.Duration
90
91	// DelayRx adds latency ± random variable for "incoming" traffic
92	// in "receiving" layer.
93	DelayRx(latency, rv time.Duration)
94	// UndelayRx removes "receiving" latencies.
95	UndelayRx()
96	// LatencyRx returns current receive latency.
97	LatencyRx() time.Duration
98
99	// ModifyTx alters/corrupts/drops "outgoing" packets from the listener
100	// with the given edit function.
101	ModifyTx(f func(data []byte) []byte)
102	// UnmodifyTx removes modify operation on "forwarding".
103	UnmodifyTx()
104
105	// ModifyRx alters/corrupts/drops "incoming" packets to client
106	// with the given edit function.
107	ModifyRx(f func(data []byte) []byte)
108	// UnmodifyRx removes modify operation on "receiving".
109	UnmodifyRx()
110
111	// BlackholeTx drops all "outgoing" packets before "forwarding".
112	// "BlackholeTx" operation is a wrapper around "ModifyTx" with
113	// a function that returns empty bytes.
114	BlackholeTx()
115	// UnblackholeTx removes blackhole operation on "sending".
116	UnblackholeTx()
117
118	// BlackholeRx drops all "incoming" packets to client.
119	// "BlackholeRx" operation is a wrapper around "ModifyRx" with
120	// a function that returns empty bytes.
121	BlackholeRx()
122	// UnblackholeRx removes blackhole operation on "receiving".
123	UnblackholeRx()
124
125	// PauseTx stops "forwarding" packets; "outgoing" traffic blocks.
126	PauseTx()
127	// UnpauseTx removes "forwarding" pause operation.
128	UnpauseTx()
129
130	// PauseRx stops "receiving" packets; "incoming" traffic blocks.
131	PauseRx()
132	// UnpauseRx removes "receiving" pause operation.
133	UnpauseRx()
134
135	// ResetListener closes and restarts listener.
136	ResetListener() error
137}
138
139// ServerConfig defines proxy server configuration.
140type ServerConfig struct {
141	Logger        *zap.Logger
142	From          url.URL
143	To            url.URL
144	TLSInfo       transport.TLSInfo
145	DialTimeout   time.Duration
146	BufferSize    int
147	RetryInterval time.Duration
148}
149
150type server struct {
151	lg *zap.Logger
152
153	from     url.URL
154	fromPort int
155	to       url.URL
156	toPort   int
157
158	tlsInfo     transport.TLSInfo
159	dialTimeout time.Duration
160
161	bufferSize    int
162	retryInterval time.Duration
163
164	readyc chan struct{}
165	donec  chan struct{}
166	errc   chan error
167
168	closeOnce sync.Once
169	closeWg   sync.WaitGroup
170
171	listenerMu sync.RWMutex
172	listener   net.Listener
173
174	pauseAcceptMu sync.Mutex
175	pauseAcceptc  chan struct{}
176
177	latencyAcceptMu sync.RWMutex
178	latencyAccept   time.Duration
179
180	modifyTxMu sync.RWMutex
181	modifyTx   func(data []byte) []byte
182
183	modifyRxMu sync.RWMutex
184	modifyRx   func(data []byte) []byte
185
186	pauseTxMu sync.Mutex
187	pauseTxc  chan struct{}
188
189	pauseRxMu sync.Mutex
190	pauseRxc  chan struct{}
191
192	latencyTxMu sync.RWMutex
193	latencyTx   time.Duration
194
195	latencyRxMu sync.RWMutex
196	latencyRx   time.Duration
197}
198
199// NewServer returns a proxy implementation with no iptables/tc dependencies.
200// The proxy layer overhead is <1ms.
201func NewServer(cfg ServerConfig) Server {
202	s := &server{
203		lg: cfg.Logger,
204
205		from: cfg.From,
206		to:   cfg.To,
207
208		tlsInfo:     cfg.TLSInfo,
209		dialTimeout: cfg.DialTimeout,
210
211		bufferSize:    cfg.BufferSize,
212		retryInterval: cfg.RetryInterval,
213
214		readyc: make(chan struct{}),
215		donec:  make(chan struct{}),
216		errc:   make(chan error, 16),
217
218		pauseAcceptc: make(chan struct{}),
219		pauseTxc:     make(chan struct{}),
220		pauseRxc:     make(chan struct{}),
221	}
222
223	_, fromPort, err := net.SplitHostPort(cfg.From.Host)
224	if err == nil {
225		s.fromPort, _ = strconv.Atoi(fromPort)
226	}
227	var toPort string
228	_, toPort, err = net.SplitHostPort(cfg.To.Host)
229	if err == nil {
230		s.toPort, _ = strconv.Atoi(toPort)
231	}
232
233	if s.dialTimeout == 0 {
234		s.dialTimeout = defaultDialTimeout
235	}
236	if s.bufferSize == 0 {
237		s.bufferSize = defaultBufferSize
238	}
239	if s.retryInterval == 0 {
240		s.retryInterval = defaultRetryInterval
241	}
242	if s.lg == nil {
243		s.lg = defaultLogger
244	}
245
246	close(s.pauseAcceptc)
247	close(s.pauseTxc)
248	close(s.pauseRxc)
249
250	if strings.HasPrefix(s.from.Scheme, "http") {
251		s.from.Scheme = "tcp"
252	}
253	if strings.HasPrefix(s.to.Scheme, "http") {
254		s.to.Scheme = "tcp"
255	}
256
257	addr := fmt.Sprintf(":%d", s.fromPort)
258	if s.fromPort == 0 { // unix
259		addr = s.from.Host
260	}
261
262	var ln net.Listener
263	if !s.tlsInfo.Empty() {
264		ln, err = transport.NewListener(addr, s.from.Scheme, &s.tlsInfo)
265	} else {
266		ln, err = net.Listen(s.from.Scheme, addr)
267	}
268	if err != nil {
269		s.errc <- err
270		s.Close()
271		return s
272	}
273	s.listener = ln
274
275	s.closeWg.Add(1)
276	go s.listenAndServe()
277
278	s.lg.Info("started proxying", zap.String("from", s.From()), zap.String("to", s.To()))
279	return s
280}
281
282func (s *server) From() string {
283	return fmt.Sprintf("%s://%s", s.from.Scheme, s.from.Host)
284}
285
286func (s *server) To() string {
287	return fmt.Sprintf("%s://%s", s.to.Scheme, s.to.Host)
288}
289
290// TODO: implement packet reordering from multiple TCP connections
291// buffer packets per connection for awhile, reorder before transmit
292// - https://github.com/etcd-io/etcd/issues/5614
293// - https://github.com/etcd-io/etcd/pull/6918#issuecomment-264093034
294
295func (s *server) listenAndServe() {
296	defer s.closeWg.Done()
297
298	s.lg.Info("proxy is listening on", zap.String("from", s.From()))
299	close(s.readyc)
300
301	for {
302		s.pauseAcceptMu.Lock()
303		pausec := s.pauseAcceptc
304		s.pauseAcceptMu.Unlock()
305		select {
306		case <-pausec:
307		case <-s.donec:
308			return
309		}
310
311		s.latencyAcceptMu.RLock()
312		lat := s.latencyAccept
313		s.latencyAcceptMu.RUnlock()
314		if lat > 0 {
315			select {
316			case <-time.After(lat):
317			case <-s.donec:
318				return
319			}
320		}
321
322		s.listenerMu.RLock()
323		ln := s.listener
324		s.listenerMu.RUnlock()
325
326		in, err := ln.Accept()
327		if err != nil {
328			select {
329			case s.errc <- err:
330				select {
331				case <-s.donec:
332					return
333				default:
334				}
335			case <-s.donec:
336				return
337			}
338			s.lg.Debug("listener accept error", zap.Error(err))
339
340			if strings.HasSuffix(err.Error(), "use of closed network connection") {
341				select {
342				case <-time.After(s.retryInterval):
343				case <-s.donec:
344					return
345				}
346				s.lg.Debug("listener is closed; retry listening on", zap.String("from", s.From()))
347
348				if err = s.ResetListener(); err != nil {
349					select {
350					case s.errc <- err:
351						select {
352						case <-s.donec:
353							return
354						default:
355						}
356					case <-s.donec:
357						return
358					}
359					s.lg.Warn("failed to reset listener", zap.Error(err))
360				}
361			}
362
363			continue
364		}
365
366		var out net.Conn
367		if !s.tlsInfo.Empty() {
368			var tp *http.Transport
369			tp, err = transport.NewTransport(s.tlsInfo, s.dialTimeout)
370			if err != nil {
371				select {
372				case s.errc <- err:
373					select {
374					case <-s.donec:
375						return
376					default:
377					}
378				case <-s.donec:
379					return
380				}
381				continue
382			}
383			out, err = tp.Dial(s.to.Scheme, s.to.Host)
384		} else {
385			out, err = net.Dial(s.to.Scheme, s.to.Host)
386		}
387		if err != nil {
388			select {
389			case s.errc <- err:
390				select {
391				case <-s.donec:
392					return
393				default:
394				}
395			case <-s.donec:
396				return
397			}
398			s.lg.Debug("failed to dial", zap.Error(err))
399			continue
400		}
401
402		go func() {
403			// read incoming bytes from listener, dispatch to outgoing connection
404			s.transmit(out, in)
405			out.Close()
406			in.Close()
407		}()
408		go func() {
409			// read response from outgoing connection, write back to listener
410			s.receive(in, out)
411			in.Close()
412			out.Close()
413		}()
414	}
415}
416
417func (s *server) transmit(dst io.Writer, src io.Reader) {
418	s.ioCopy(dst, src, proxyTx)
419}
420
421func (s *server) receive(dst io.Writer, src io.Reader) {
422	s.ioCopy(dst, src, proxyRx)
423}
424
425type proxyType uint8
426
427const (
428	proxyTx proxyType = iota
429	proxyRx
430)
431
432func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) {
433	buf := make([]byte, s.bufferSize)
434	for {
435		nr1, err := src.Read(buf)
436		if err != nil {
437			if err == io.EOF {
438				return
439			}
440			// connection already closed
441			if strings.HasSuffix(err.Error(), "read: connection reset by peer") {
442				return
443			}
444			if strings.HasSuffix(err.Error(), "use of closed network connection") {
445				return
446			}
447			select {
448			case s.errc <- err:
449				select {
450				case <-s.donec:
451					return
452				default:
453				}
454			case <-s.donec:
455				return
456			}
457			s.lg.Debug("failed to read", zap.Error(err))
458			return
459		}
460		if nr1 == 0 {
461			return
462		}
463		data := buf[:nr1]
464
465		// alters/corrupts/drops data
466		switch ptype {
467		case proxyTx:
468			s.modifyTxMu.RLock()
469			if s.modifyTx != nil {
470				data = s.modifyTx(data)
471			}
472			s.modifyTxMu.RUnlock()
473		case proxyRx:
474			s.modifyRxMu.RLock()
475			if s.modifyRx != nil {
476				data = s.modifyRx(data)
477			}
478			s.modifyRxMu.RUnlock()
479		default:
480			panic("unknown proxy type")
481		}
482		nr2 := len(data)
483		switch ptype {
484		case proxyTx:
485			s.lg.Debug(
486				"modified tx",
487				zap.String("data-received", humanize.Bytes(uint64(nr1))),
488				zap.String("data-modified", humanize.Bytes(uint64(nr2))),
489				zap.String("from", s.From()),
490				zap.String("to", s.To()),
491			)
492		case proxyRx:
493			s.lg.Debug(
494				"modified rx",
495				zap.String("data-received", humanize.Bytes(uint64(nr1))),
496				zap.String("data-modified", humanize.Bytes(uint64(nr2))),
497				zap.String("from", s.To()),
498				zap.String("to", s.From()),
499			)
500		default:
501			panic("unknown proxy type")
502		}
503
504		// pause before packet dropping, blocking, and forwarding
505		var pausec chan struct{}
506		switch ptype {
507		case proxyTx:
508			s.pauseTxMu.Lock()
509			pausec = s.pauseTxc
510			s.pauseTxMu.Unlock()
511		case proxyRx:
512			s.pauseRxMu.Lock()
513			pausec = s.pauseRxc
514			s.pauseRxMu.Unlock()
515		default:
516			panic("unknown proxy type")
517		}
518		select {
519		case <-pausec:
520		case <-s.donec:
521			return
522		}
523
524		// pause first, and then drop packets
525		if nr2 == 0 {
526			continue
527		}
528
529		// block before forwarding
530		var lat time.Duration
531		switch ptype {
532		case proxyTx:
533			s.latencyTxMu.RLock()
534			lat = s.latencyTx
535			s.latencyTxMu.RUnlock()
536		case proxyRx:
537			s.latencyRxMu.RLock()
538			lat = s.latencyRx
539			s.latencyRxMu.RUnlock()
540		default:
541			panic("unknown proxy type")
542		}
543		if lat > 0 {
544			select {
545			case <-time.After(lat):
546			case <-s.donec:
547				return
548			}
549		}
550
551		// now forward packets to target
552		var nw int
553		nw, err = dst.Write(data)
554		if err != nil {
555			if err == io.EOF {
556				return
557			}
558			select {
559			case s.errc <- err:
560				select {
561				case <-s.donec:
562					return
563				default:
564				}
565			case <-s.donec:
566				return
567			}
568			switch ptype {
569			case proxyTx:
570				s.lg.Debug("write fail on tx", zap.Error(err))
571			case proxyRx:
572				s.lg.Debug("write fail on rx", zap.Error(err))
573			default:
574				panic("unknown proxy type")
575			}
576			return
577		}
578
579		if nr2 != nw {
580			select {
581			case s.errc <- io.ErrShortWrite:
582				select {
583				case <-s.donec:
584					return
585				default:
586				}
587			case <-s.donec:
588				return
589			}
590			switch ptype {
591			case proxyTx:
592				s.lg.Debug(
593					"write fail on tx; read/write bytes are different",
594					zap.Int("read-bytes", nr1),
595					zap.Int("write-bytes", nw),
596					zap.Error(io.ErrShortWrite),
597				)
598			case proxyRx:
599				s.lg.Debug(
600					"write fail on rx; read/write bytes are different",
601					zap.Int("read-bytes", nr1),
602					zap.Int("write-bytes", nw),
603					zap.Error(io.ErrShortWrite),
604				)
605			default:
606				panic("unknown proxy type")
607			}
608			return
609		}
610
611		switch ptype {
612		case proxyTx:
613			s.lg.Debug(
614				"transmitted",
615				zap.String("data-size", humanize.Bytes(uint64(nr1))),
616				zap.String("from", s.From()),
617				zap.String("to", s.To()),
618			)
619		case proxyRx:
620			s.lg.Debug(
621				"received",
622				zap.String("data-size", humanize.Bytes(uint64(nr1))),
623				zap.String("from", s.To()),
624				zap.String("to", s.From()),
625			)
626		default:
627			panic("unknown proxy type")
628		}
629	}
630}
631
632func (s *server) Ready() <-chan struct{} { return s.readyc }
633func (s *server) Done() <-chan struct{}  { return s.donec }
634func (s *server) Error() <-chan error    { return s.errc }
635func (s *server) Close() (err error) {
636	s.closeOnce.Do(func() {
637		close(s.donec)
638		s.listenerMu.Lock()
639		if s.listener != nil {
640			err = s.listener.Close()
641			s.lg.Info(
642				"closed proxy listener",
643				zap.String("from", s.From()),
644				zap.String("to", s.To()),
645			)
646		}
647		s.lg.Sync()
648		s.listenerMu.Unlock()
649	})
650	s.closeWg.Wait()
651	return err
652}
653
654func (s *server) PauseAccept() {
655	s.pauseAcceptMu.Lock()
656	s.pauseAcceptc = make(chan struct{})
657	s.pauseAcceptMu.Unlock()
658
659	s.lg.Info(
660		"paused accept",
661		zap.String("from", s.From()),
662		zap.String("to", s.To()),
663	)
664}
665
666func (s *server) UnpauseAccept() {
667	s.pauseAcceptMu.Lock()
668	select {
669	case <-s.pauseAcceptc: // already unpaused
670	case <-s.donec:
671		s.pauseAcceptMu.Unlock()
672		return
673	default:
674		close(s.pauseAcceptc)
675	}
676	s.pauseAcceptMu.Unlock()
677
678	s.lg.Info(
679		"unpaused accept",
680		zap.String("from", s.From()),
681		zap.String("to", s.To()),
682	)
683}
684
685func (s *server) DelayAccept(latency, rv time.Duration) {
686	if latency <= 0 {
687		return
688	}
689	d := computeLatency(latency, rv)
690	s.latencyAcceptMu.Lock()
691	s.latencyAccept = d
692	s.latencyAcceptMu.Unlock()
693
694	s.lg.Info(
695		"set accept latency",
696		zap.Duration("latency", d),
697		zap.Duration("given-latency", latency),
698		zap.Duration("given-latency-random-variable", rv),
699		zap.String("from", s.From()),
700		zap.String("to", s.To()),
701	)
702}
703
704func (s *server) UndelayAccept() {
705	s.latencyAcceptMu.Lock()
706	d := s.latencyAccept
707	s.latencyAccept = 0
708	s.latencyAcceptMu.Unlock()
709
710	s.lg.Info(
711		"removed accept latency",
712		zap.Duration("latency", d),
713		zap.String("from", s.From()),
714		zap.String("to", s.To()),
715	)
716}
717
718func (s *server) LatencyAccept() time.Duration {
719	s.latencyAcceptMu.RLock()
720	d := s.latencyAccept
721	s.latencyAcceptMu.RUnlock()
722	return d
723}
724
725func (s *server) DelayTx(latency, rv time.Duration) {
726	if latency <= 0 {
727		return
728	}
729	d := computeLatency(latency, rv)
730	s.latencyTxMu.Lock()
731	s.latencyTx = d
732	s.latencyTxMu.Unlock()
733
734	s.lg.Info(
735		"set transmit latency",
736		zap.Duration("latency", d),
737		zap.Duration("given-latency", latency),
738		zap.Duration("given-latency-random-variable", rv),
739		zap.String("from", s.From()),
740		zap.String("to", s.To()),
741	)
742}
743
744func (s *server) UndelayTx() {
745	s.latencyTxMu.Lock()
746	d := s.latencyTx
747	s.latencyTx = 0
748	s.latencyTxMu.Unlock()
749
750	s.lg.Info(
751		"removed transmit latency",
752		zap.Duration("latency", d),
753		zap.String("from", s.From()),
754		zap.String("to", s.To()),
755	)
756}
757
758func (s *server) LatencyTx() time.Duration {
759	s.latencyTxMu.RLock()
760	d := s.latencyTx
761	s.latencyTxMu.RUnlock()
762	return d
763}
764
765func (s *server) DelayRx(latency, rv time.Duration) {
766	if latency <= 0 {
767		return
768	}
769	d := computeLatency(latency, rv)
770	s.latencyRxMu.Lock()
771	s.latencyRx = d
772	s.latencyRxMu.Unlock()
773
774	s.lg.Info(
775		"set receive latency",
776		zap.Duration("latency", d),
777		zap.Duration("given-latency", latency),
778		zap.Duration("given-latency-random-variable", rv),
779		zap.String("from", s.To()),
780		zap.String("to", s.From()),
781	)
782}
783
784func (s *server) UndelayRx() {
785	s.latencyRxMu.Lock()
786	d := s.latencyRx
787	s.latencyRx = 0
788	s.latencyRxMu.Unlock()
789
790	s.lg.Info(
791		"removed receive latency",
792		zap.Duration("latency", d),
793		zap.String("from", s.To()),
794		zap.String("to", s.From()),
795	)
796}
797
798func (s *server) LatencyRx() time.Duration {
799	s.latencyRxMu.RLock()
800	d := s.latencyRx
801	s.latencyRxMu.RUnlock()
802	return d
803}
804
805func computeLatency(lat, rv time.Duration) time.Duration {
806	if rv == 0 {
807		return lat
808	}
809	if rv < 0 {
810		rv *= -1
811	}
812	if rv > lat {
813		rv = lat / 10
814	}
815	now := time.Now()
816	mrand.Seed(int64(now.Nanosecond()))
817	sign := 1
818	if now.Second()%2 == 0 {
819		sign = -1
820	}
821	return lat + time.Duration(int64(sign)*mrand.Int63n(rv.Nanoseconds()))
822}
823
824func (s *server) ModifyTx(f func([]byte) []byte) {
825	s.modifyTxMu.Lock()
826	s.modifyTx = f
827	s.modifyTxMu.Unlock()
828
829	s.lg.Info(
830		"modifying tx",
831		zap.String("from", s.From()),
832		zap.String("to", s.To()),
833	)
834}
835
836func (s *server) UnmodifyTx() {
837	s.modifyTxMu.Lock()
838	s.modifyTx = nil
839	s.modifyTxMu.Unlock()
840
841	s.lg.Info(
842		"unmodifyed tx",
843		zap.String("from", s.From()),
844		zap.String("to", s.To()),
845	)
846}
847
848func (s *server) ModifyRx(f func([]byte) []byte) {
849	s.modifyRxMu.Lock()
850	s.modifyRx = f
851	s.modifyRxMu.Unlock()
852	s.lg.Info(
853		"modifying rx",
854		zap.String("from", s.To()),
855		zap.String("to", s.From()),
856	)
857}
858
859func (s *server) UnmodifyRx() {
860	s.modifyRxMu.Lock()
861	s.modifyRx = nil
862	s.modifyRxMu.Unlock()
863
864	s.lg.Info(
865		"unmodifyed rx",
866		zap.String("from", s.To()),
867		zap.String("to", s.From()),
868	)
869}
870
871func (s *server) BlackholeTx() {
872	s.ModifyTx(func([]byte) []byte { return nil })
873	s.lg.Info(
874		"blackholed tx",
875		zap.String("from", s.From()),
876		zap.String("to", s.To()),
877	)
878}
879
880func (s *server) UnblackholeTx() {
881	s.UnmodifyTx()
882	s.lg.Info(
883		"unblackholed tx",
884		zap.String("from", s.From()),
885		zap.String("to", s.To()),
886	)
887}
888
889func (s *server) BlackholeRx() {
890	s.ModifyRx(func([]byte) []byte { return nil })
891	s.lg.Info(
892		"blackholed rx",
893		zap.String("from", s.To()),
894		zap.String("to", s.From()),
895	)
896}
897
898func (s *server) UnblackholeRx() {
899	s.UnmodifyRx()
900	s.lg.Info(
901		"unblackholed rx",
902		zap.String("from", s.To()),
903		zap.String("to", s.From()),
904	)
905}
906
907func (s *server) PauseTx() {
908	s.pauseTxMu.Lock()
909	s.pauseTxc = make(chan struct{})
910	s.pauseTxMu.Unlock()
911
912	s.lg.Info(
913		"paused tx",
914		zap.String("from", s.From()),
915		zap.String("to", s.To()),
916	)
917}
918
919func (s *server) UnpauseTx() {
920	s.pauseTxMu.Lock()
921	select {
922	case <-s.pauseTxc: // already unpaused
923	case <-s.donec:
924		s.pauseTxMu.Unlock()
925		return
926	default:
927		close(s.pauseTxc)
928	}
929	s.pauseTxMu.Unlock()
930
931	s.lg.Info(
932		"unpaused tx",
933		zap.String("from", s.From()),
934		zap.String("to", s.To()),
935	)
936}
937
938func (s *server) PauseRx() {
939	s.pauseRxMu.Lock()
940	s.pauseRxc = make(chan struct{})
941	s.pauseRxMu.Unlock()
942
943	s.lg.Info(
944		"paused rx",
945		zap.String("from", s.To()),
946		zap.String("to", s.From()),
947	)
948}
949
950func (s *server) UnpauseRx() {
951	s.pauseRxMu.Lock()
952	select {
953	case <-s.pauseRxc: // already unpaused
954	case <-s.donec:
955		s.pauseRxMu.Unlock()
956		return
957	default:
958		close(s.pauseRxc)
959	}
960	s.pauseRxMu.Unlock()
961
962	s.lg.Info(
963		"unpaused rx",
964		zap.String("from", s.To()),
965		zap.String("to", s.From()),
966	)
967}
968
969func (s *server) ResetListener() error {
970	s.listenerMu.Lock()
971	defer s.listenerMu.Unlock()
972
973	if err := s.listener.Close(); err != nil {
974		// already closed
975		if !strings.HasSuffix(err.Error(), "use of closed network connection") {
976			return err
977		}
978	}
979
980	var ln net.Listener
981	var err error
982	if !s.tlsInfo.Empty() {
983		ln, err = transport.NewListener(s.from.Host, s.from.Scheme, &s.tlsInfo)
984	} else {
985		ln, err = net.Listen(s.from.Scheme, s.from.Host)
986	}
987	if err != nil {
988		return err
989	}
990	s.listener = ln
991
992	s.lg.Info(
993		"reset listener on",
994		zap.String("from", s.From()),
995	)
996	return nil
997}
998