1// Copyright 2018 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package proxy 16 17import ( 18 "fmt" 19 "io" 20 mrand "math/rand" 21 "net" 22 "net/http" 23 "net/url" 24 "strconv" 25 "strings" 26 "sync" 27 "time" 28 29 "go.etcd.io/etcd/pkg/transport" 30 31 humanize "github.com/dustin/go-humanize" 32 "go.uber.org/zap" 33) 34 35var ( 36 defaultDialTimeout = 3 * time.Second 37 defaultBufferSize = 48 * 1024 38 defaultRetryInterval = 10 * time.Millisecond 39 defaultLogger *zap.Logger 40) 41 42func init() { 43 var err error 44 defaultLogger, err = zap.NewProduction() 45 if err != nil { 46 panic(err) 47 } 48} 49 50// Server defines proxy server layer that simulates common network faults: 51// latency spikes and packet drop or corruption. The proxy overhead is very 52// small overhead (<500μs per request). Please run tests to compute actual 53// overhead. 54type Server interface { 55 // From returns proxy source address in "scheme://host:port" format. 56 From() string 57 // To returns proxy destination address in "scheme://host:port" format. 58 To() string 59 60 // Ready returns when proxy is ready to serve. 61 Ready() <-chan struct{} 62 // Done returns when proxy has been closed. 63 Done() <-chan struct{} 64 // Error sends errors while serving proxy. 65 Error() <-chan error 66 // Close closes listener and transport. 67 Close() error 68 69 // PauseAccept stops accepting new connections. 70 PauseAccept() 71 // UnpauseAccept removes pause operation on accepting new connections. 72 UnpauseAccept() 73 74 // DelayAccept adds latency ± random variable to accepting 75 // new incoming connections. 76 DelayAccept(latency, rv time.Duration) 77 // UndelayAccept removes sending latencies. 78 UndelayAccept() 79 // LatencyAccept returns current latency on accepting 80 // new incoming connections. 81 LatencyAccept() time.Duration 82 83 // DelayTx adds latency ± random variable for "outgoing" traffic 84 // in "sending" layer. 85 DelayTx(latency, rv time.Duration) 86 // UndelayTx removes sending latencies. 87 UndelayTx() 88 // LatencyTx returns current send latency. 89 LatencyTx() time.Duration 90 91 // DelayRx adds latency ± random variable for "incoming" traffic 92 // in "receiving" layer. 93 DelayRx(latency, rv time.Duration) 94 // UndelayRx removes "receiving" latencies. 95 UndelayRx() 96 // LatencyRx returns current receive latency. 97 LatencyRx() time.Duration 98 99 // ModifyTx alters/corrupts/drops "outgoing" packets from the listener 100 // with the given edit function. 101 ModifyTx(f func(data []byte) []byte) 102 // UnmodifyTx removes modify operation on "forwarding". 103 UnmodifyTx() 104 105 // ModifyRx alters/corrupts/drops "incoming" packets to client 106 // with the given edit function. 107 ModifyRx(f func(data []byte) []byte) 108 // UnmodifyRx removes modify operation on "receiving". 109 UnmodifyRx() 110 111 // BlackholeTx drops all "outgoing" packets before "forwarding". 112 // "BlackholeTx" operation is a wrapper around "ModifyTx" with 113 // a function that returns empty bytes. 114 BlackholeTx() 115 // UnblackholeTx removes blackhole operation on "sending". 116 UnblackholeTx() 117 118 // BlackholeRx drops all "incoming" packets to client. 119 // "BlackholeRx" operation is a wrapper around "ModifyRx" with 120 // a function that returns empty bytes. 121 BlackholeRx() 122 // UnblackholeRx removes blackhole operation on "receiving". 123 UnblackholeRx() 124 125 // PauseTx stops "forwarding" packets; "outgoing" traffic blocks. 126 PauseTx() 127 // UnpauseTx removes "forwarding" pause operation. 128 UnpauseTx() 129 130 // PauseRx stops "receiving" packets; "incoming" traffic blocks. 131 PauseRx() 132 // UnpauseRx removes "receiving" pause operation. 133 UnpauseRx() 134 135 // ResetListener closes and restarts listener. 136 ResetListener() error 137} 138 139// ServerConfig defines proxy server configuration. 140type ServerConfig struct { 141 Logger *zap.Logger 142 From url.URL 143 To url.URL 144 TLSInfo transport.TLSInfo 145 DialTimeout time.Duration 146 BufferSize int 147 RetryInterval time.Duration 148} 149 150type server struct { 151 lg *zap.Logger 152 153 from url.URL 154 fromPort int 155 to url.URL 156 toPort int 157 158 tlsInfo transport.TLSInfo 159 dialTimeout time.Duration 160 161 bufferSize int 162 retryInterval time.Duration 163 164 readyc chan struct{} 165 donec chan struct{} 166 errc chan error 167 168 closeOnce sync.Once 169 closeWg sync.WaitGroup 170 171 listenerMu sync.RWMutex 172 listener net.Listener 173 174 pauseAcceptMu sync.Mutex 175 pauseAcceptc chan struct{} 176 177 latencyAcceptMu sync.RWMutex 178 latencyAccept time.Duration 179 180 modifyTxMu sync.RWMutex 181 modifyTx func(data []byte) []byte 182 183 modifyRxMu sync.RWMutex 184 modifyRx func(data []byte) []byte 185 186 pauseTxMu sync.Mutex 187 pauseTxc chan struct{} 188 189 pauseRxMu sync.Mutex 190 pauseRxc chan struct{} 191 192 latencyTxMu sync.RWMutex 193 latencyTx time.Duration 194 195 latencyRxMu sync.RWMutex 196 latencyRx time.Duration 197} 198 199// NewServer returns a proxy implementation with no iptables/tc dependencies. 200// The proxy layer overhead is <1ms. 201func NewServer(cfg ServerConfig) Server { 202 s := &server{ 203 lg: cfg.Logger, 204 205 from: cfg.From, 206 to: cfg.To, 207 208 tlsInfo: cfg.TLSInfo, 209 dialTimeout: cfg.DialTimeout, 210 211 bufferSize: cfg.BufferSize, 212 retryInterval: cfg.RetryInterval, 213 214 readyc: make(chan struct{}), 215 donec: make(chan struct{}), 216 errc: make(chan error, 16), 217 218 pauseAcceptc: make(chan struct{}), 219 pauseTxc: make(chan struct{}), 220 pauseRxc: make(chan struct{}), 221 } 222 223 _, fromPort, err := net.SplitHostPort(cfg.From.Host) 224 if err == nil { 225 s.fromPort, _ = strconv.Atoi(fromPort) 226 } 227 var toPort string 228 _, toPort, err = net.SplitHostPort(cfg.To.Host) 229 if err == nil { 230 s.toPort, _ = strconv.Atoi(toPort) 231 } 232 233 if s.dialTimeout == 0 { 234 s.dialTimeout = defaultDialTimeout 235 } 236 if s.bufferSize == 0 { 237 s.bufferSize = defaultBufferSize 238 } 239 if s.retryInterval == 0 { 240 s.retryInterval = defaultRetryInterval 241 } 242 if s.lg == nil { 243 s.lg = defaultLogger 244 } 245 246 close(s.pauseAcceptc) 247 close(s.pauseTxc) 248 close(s.pauseRxc) 249 250 if strings.HasPrefix(s.from.Scheme, "http") { 251 s.from.Scheme = "tcp" 252 } 253 if strings.HasPrefix(s.to.Scheme, "http") { 254 s.to.Scheme = "tcp" 255 } 256 257 addr := fmt.Sprintf(":%d", s.fromPort) 258 if s.fromPort == 0 { // unix 259 addr = s.from.Host 260 } 261 262 var ln net.Listener 263 if !s.tlsInfo.Empty() { 264 ln, err = transport.NewListener(addr, s.from.Scheme, &s.tlsInfo) 265 } else { 266 ln, err = net.Listen(s.from.Scheme, addr) 267 } 268 if err != nil { 269 s.errc <- err 270 s.Close() 271 return s 272 } 273 s.listener = ln 274 275 s.closeWg.Add(1) 276 go s.listenAndServe() 277 278 s.lg.Info("started proxying", zap.String("from", s.From()), zap.String("to", s.To())) 279 return s 280} 281 282func (s *server) From() string { 283 return fmt.Sprintf("%s://%s", s.from.Scheme, s.from.Host) 284} 285 286func (s *server) To() string { 287 return fmt.Sprintf("%s://%s", s.to.Scheme, s.to.Host) 288} 289 290// TODO: implement packet reordering from multiple TCP connections 291// buffer packets per connection for awhile, reorder before transmit 292// - https://github.com/etcd-io/etcd/issues/5614 293// - https://github.com/etcd-io/etcd/pull/6918#issuecomment-264093034 294 295func (s *server) listenAndServe() { 296 defer s.closeWg.Done() 297 298 s.lg.Info("proxy is listening on", zap.String("from", s.From())) 299 close(s.readyc) 300 301 for { 302 s.pauseAcceptMu.Lock() 303 pausec := s.pauseAcceptc 304 s.pauseAcceptMu.Unlock() 305 select { 306 case <-pausec: 307 case <-s.donec: 308 return 309 } 310 311 s.latencyAcceptMu.RLock() 312 lat := s.latencyAccept 313 s.latencyAcceptMu.RUnlock() 314 if lat > 0 { 315 select { 316 case <-time.After(lat): 317 case <-s.donec: 318 return 319 } 320 } 321 322 s.listenerMu.RLock() 323 ln := s.listener 324 s.listenerMu.RUnlock() 325 326 in, err := ln.Accept() 327 if err != nil { 328 select { 329 case s.errc <- err: 330 select { 331 case <-s.donec: 332 return 333 default: 334 } 335 case <-s.donec: 336 return 337 } 338 s.lg.Debug("listener accept error", zap.Error(err)) 339 340 if strings.HasSuffix(err.Error(), "use of closed network connection") { 341 select { 342 case <-time.After(s.retryInterval): 343 case <-s.donec: 344 return 345 } 346 s.lg.Debug("listener is closed; retry listening on", zap.String("from", s.From())) 347 348 if err = s.ResetListener(); err != nil { 349 select { 350 case s.errc <- err: 351 select { 352 case <-s.donec: 353 return 354 default: 355 } 356 case <-s.donec: 357 return 358 } 359 s.lg.Warn("failed to reset listener", zap.Error(err)) 360 } 361 } 362 363 continue 364 } 365 366 var out net.Conn 367 if !s.tlsInfo.Empty() { 368 var tp *http.Transport 369 tp, err = transport.NewTransport(s.tlsInfo, s.dialTimeout) 370 if err != nil { 371 select { 372 case s.errc <- err: 373 select { 374 case <-s.donec: 375 return 376 default: 377 } 378 case <-s.donec: 379 return 380 } 381 continue 382 } 383 out, err = tp.Dial(s.to.Scheme, s.to.Host) 384 } else { 385 out, err = net.Dial(s.to.Scheme, s.to.Host) 386 } 387 if err != nil { 388 select { 389 case s.errc <- err: 390 select { 391 case <-s.donec: 392 return 393 default: 394 } 395 case <-s.donec: 396 return 397 } 398 s.lg.Debug("failed to dial", zap.Error(err)) 399 continue 400 } 401 402 go func() { 403 // read incoming bytes from listener, dispatch to outgoing connection 404 s.transmit(out, in) 405 out.Close() 406 in.Close() 407 }() 408 go func() { 409 // read response from outgoing connection, write back to listener 410 s.receive(in, out) 411 in.Close() 412 out.Close() 413 }() 414 } 415} 416 417func (s *server) transmit(dst io.Writer, src io.Reader) { 418 s.ioCopy(dst, src, proxyTx) 419} 420 421func (s *server) receive(dst io.Writer, src io.Reader) { 422 s.ioCopy(dst, src, proxyRx) 423} 424 425type proxyType uint8 426 427const ( 428 proxyTx proxyType = iota 429 proxyRx 430) 431 432func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) { 433 buf := make([]byte, s.bufferSize) 434 for { 435 nr1, err := src.Read(buf) 436 if err != nil { 437 if err == io.EOF { 438 return 439 } 440 // connection already closed 441 if strings.HasSuffix(err.Error(), "read: connection reset by peer") { 442 return 443 } 444 if strings.HasSuffix(err.Error(), "use of closed network connection") { 445 return 446 } 447 select { 448 case s.errc <- err: 449 select { 450 case <-s.donec: 451 return 452 default: 453 } 454 case <-s.donec: 455 return 456 } 457 s.lg.Debug("failed to read", zap.Error(err)) 458 return 459 } 460 if nr1 == 0 { 461 return 462 } 463 data := buf[:nr1] 464 465 // alters/corrupts/drops data 466 switch ptype { 467 case proxyTx: 468 s.modifyTxMu.RLock() 469 if s.modifyTx != nil { 470 data = s.modifyTx(data) 471 } 472 s.modifyTxMu.RUnlock() 473 case proxyRx: 474 s.modifyRxMu.RLock() 475 if s.modifyRx != nil { 476 data = s.modifyRx(data) 477 } 478 s.modifyRxMu.RUnlock() 479 default: 480 panic("unknown proxy type") 481 } 482 nr2 := len(data) 483 switch ptype { 484 case proxyTx: 485 s.lg.Debug( 486 "modified tx", 487 zap.String("data-received", humanize.Bytes(uint64(nr1))), 488 zap.String("data-modified", humanize.Bytes(uint64(nr2))), 489 zap.String("from", s.From()), 490 zap.String("to", s.To()), 491 ) 492 case proxyRx: 493 s.lg.Debug( 494 "modified rx", 495 zap.String("data-received", humanize.Bytes(uint64(nr1))), 496 zap.String("data-modified", humanize.Bytes(uint64(nr2))), 497 zap.String("from", s.To()), 498 zap.String("to", s.From()), 499 ) 500 default: 501 panic("unknown proxy type") 502 } 503 504 // pause before packet dropping, blocking, and forwarding 505 var pausec chan struct{} 506 switch ptype { 507 case proxyTx: 508 s.pauseTxMu.Lock() 509 pausec = s.pauseTxc 510 s.pauseTxMu.Unlock() 511 case proxyRx: 512 s.pauseRxMu.Lock() 513 pausec = s.pauseRxc 514 s.pauseRxMu.Unlock() 515 default: 516 panic("unknown proxy type") 517 } 518 select { 519 case <-pausec: 520 case <-s.donec: 521 return 522 } 523 524 // pause first, and then drop packets 525 if nr2 == 0 { 526 continue 527 } 528 529 // block before forwarding 530 var lat time.Duration 531 switch ptype { 532 case proxyTx: 533 s.latencyTxMu.RLock() 534 lat = s.latencyTx 535 s.latencyTxMu.RUnlock() 536 case proxyRx: 537 s.latencyRxMu.RLock() 538 lat = s.latencyRx 539 s.latencyRxMu.RUnlock() 540 default: 541 panic("unknown proxy type") 542 } 543 if lat > 0 { 544 select { 545 case <-time.After(lat): 546 case <-s.donec: 547 return 548 } 549 } 550 551 // now forward packets to target 552 var nw int 553 nw, err = dst.Write(data) 554 if err != nil { 555 if err == io.EOF { 556 return 557 } 558 select { 559 case s.errc <- err: 560 select { 561 case <-s.donec: 562 return 563 default: 564 } 565 case <-s.donec: 566 return 567 } 568 switch ptype { 569 case proxyTx: 570 s.lg.Debug("write fail on tx", zap.Error(err)) 571 case proxyRx: 572 s.lg.Debug("write fail on rx", zap.Error(err)) 573 default: 574 panic("unknown proxy type") 575 } 576 return 577 } 578 579 if nr2 != nw { 580 select { 581 case s.errc <- io.ErrShortWrite: 582 select { 583 case <-s.donec: 584 return 585 default: 586 } 587 case <-s.donec: 588 return 589 } 590 switch ptype { 591 case proxyTx: 592 s.lg.Debug( 593 "write fail on tx; read/write bytes are different", 594 zap.Int("read-bytes", nr1), 595 zap.Int("write-bytes", nw), 596 zap.Error(io.ErrShortWrite), 597 ) 598 case proxyRx: 599 s.lg.Debug( 600 "write fail on rx; read/write bytes are different", 601 zap.Int("read-bytes", nr1), 602 zap.Int("write-bytes", nw), 603 zap.Error(io.ErrShortWrite), 604 ) 605 default: 606 panic("unknown proxy type") 607 } 608 return 609 } 610 611 switch ptype { 612 case proxyTx: 613 s.lg.Debug( 614 "transmitted", 615 zap.String("data-size", humanize.Bytes(uint64(nr1))), 616 zap.String("from", s.From()), 617 zap.String("to", s.To()), 618 ) 619 case proxyRx: 620 s.lg.Debug( 621 "received", 622 zap.String("data-size", humanize.Bytes(uint64(nr1))), 623 zap.String("from", s.To()), 624 zap.String("to", s.From()), 625 ) 626 default: 627 panic("unknown proxy type") 628 } 629 } 630} 631 632func (s *server) Ready() <-chan struct{} { return s.readyc } 633func (s *server) Done() <-chan struct{} { return s.donec } 634func (s *server) Error() <-chan error { return s.errc } 635func (s *server) Close() (err error) { 636 s.closeOnce.Do(func() { 637 close(s.donec) 638 s.listenerMu.Lock() 639 if s.listener != nil { 640 err = s.listener.Close() 641 s.lg.Info( 642 "closed proxy listener", 643 zap.String("from", s.From()), 644 zap.String("to", s.To()), 645 ) 646 } 647 s.lg.Sync() 648 s.listenerMu.Unlock() 649 }) 650 s.closeWg.Wait() 651 return err 652} 653 654func (s *server) PauseAccept() { 655 s.pauseAcceptMu.Lock() 656 s.pauseAcceptc = make(chan struct{}) 657 s.pauseAcceptMu.Unlock() 658 659 s.lg.Info( 660 "paused accept", 661 zap.String("from", s.From()), 662 zap.String("to", s.To()), 663 ) 664} 665 666func (s *server) UnpauseAccept() { 667 s.pauseAcceptMu.Lock() 668 select { 669 case <-s.pauseAcceptc: // already unpaused 670 case <-s.donec: 671 s.pauseAcceptMu.Unlock() 672 return 673 default: 674 close(s.pauseAcceptc) 675 } 676 s.pauseAcceptMu.Unlock() 677 678 s.lg.Info( 679 "unpaused accept", 680 zap.String("from", s.From()), 681 zap.String("to", s.To()), 682 ) 683} 684 685func (s *server) DelayAccept(latency, rv time.Duration) { 686 if latency <= 0 { 687 return 688 } 689 d := computeLatency(latency, rv) 690 s.latencyAcceptMu.Lock() 691 s.latencyAccept = d 692 s.latencyAcceptMu.Unlock() 693 694 s.lg.Info( 695 "set accept latency", 696 zap.Duration("latency", d), 697 zap.Duration("given-latency", latency), 698 zap.Duration("given-latency-random-variable", rv), 699 zap.String("from", s.From()), 700 zap.String("to", s.To()), 701 ) 702} 703 704func (s *server) UndelayAccept() { 705 s.latencyAcceptMu.Lock() 706 d := s.latencyAccept 707 s.latencyAccept = 0 708 s.latencyAcceptMu.Unlock() 709 710 s.lg.Info( 711 "removed accept latency", 712 zap.Duration("latency", d), 713 zap.String("from", s.From()), 714 zap.String("to", s.To()), 715 ) 716} 717 718func (s *server) LatencyAccept() time.Duration { 719 s.latencyAcceptMu.RLock() 720 d := s.latencyAccept 721 s.latencyAcceptMu.RUnlock() 722 return d 723} 724 725func (s *server) DelayTx(latency, rv time.Duration) { 726 if latency <= 0 { 727 return 728 } 729 d := computeLatency(latency, rv) 730 s.latencyTxMu.Lock() 731 s.latencyTx = d 732 s.latencyTxMu.Unlock() 733 734 s.lg.Info( 735 "set transmit latency", 736 zap.Duration("latency", d), 737 zap.Duration("given-latency", latency), 738 zap.Duration("given-latency-random-variable", rv), 739 zap.String("from", s.From()), 740 zap.String("to", s.To()), 741 ) 742} 743 744func (s *server) UndelayTx() { 745 s.latencyTxMu.Lock() 746 d := s.latencyTx 747 s.latencyTx = 0 748 s.latencyTxMu.Unlock() 749 750 s.lg.Info( 751 "removed transmit latency", 752 zap.Duration("latency", d), 753 zap.String("from", s.From()), 754 zap.String("to", s.To()), 755 ) 756} 757 758func (s *server) LatencyTx() time.Duration { 759 s.latencyTxMu.RLock() 760 d := s.latencyTx 761 s.latencyTxMu.RUnlock() 762 return d 763} 764 765func (s *server) DelayRx(latency, rv time.Duration) { 766 if latency <= 0 { 767 return 768 } 769 d := computeLatency(latency, rv) 770 s.latencyRxMu.Lock() 771 s.latencyRx = d 772 s.latencyRxMu.Unlock() 773 774 s.lg.Info( 775 "set receive latency", 776 zap.Duration("latency", d), 777 zap.Duration("given-latency", latency), 778 zap.Duration("given-latency-random-variable", rv), 779 zap.String("from", s.To()), 780 zap.String("to", s.From()), 781 ) 782} 783 784func (s *server) UndelayRx() { 785 s.latencyRxMu.Lock() 786 d := s.latencyRx 787 s.latencyRx = 0 788 s.latencyRxMu.Unlock() 789 790 s.lg.Info( 791 "removed receive latency", 792 zap.Duration("latency", d), 793 zap.String("from", s.To()), 794 zap.String("to", s.From()), 795 ) 796} 797 798func (s *server) LatencyRx() time.Duration { 799 s.latencyRxMu.RLock() 800 d := s.latencyRx 801 s.latencyRxMu.RUnlock() 802 return d 803} 804 805func computeLatency(lat, rv time.Duration) time.Duration { 806 if rv == 0 { 807 return lat 808 } 809 if rv < 0 { 810 rv *= -1 811 } 812 if rv > lat { 813 rv = lat / 10 814 } 815 now := time.Now() 816 mrand.Seed(int64(now.Nanosecond())) 817 sign := 1 818 if now.Second()%2 == 0 { 819 sign = -1 820 } 821 return lat + time.Duration(int64(sign)*mrand.Int63n(rv.Nanoseconds())) 822} 823 824func (s *server) ModifyTx(f func([]byte) []byte) { 825 s.modifyTxMu.Lock() 826 s.modifyTx = f 827 s.modifyTxMu.Unlock() 828 829 s.lg.Info( 830 "modifying tx", 831 zap.String("from", s.From()), 832 zap.String("to", s.To()), 833 ) 834} 835 836func (s *server) UnmodifyTx() { 837 s.modifyTxMu.Lock() 838 s.modifyTx = nil 839 s.modifyTxMu.Unlock() 840 841 s.lg.Info( 842 "unmodifyed tx", 843 zap.String("from", s.From()), 844 zap.String("to", s.To()), 845 ) 846} 847 848func (s *server) ModifyRx(f func([]byte) []byte) { 849 s.modifyRxMu.Lock() 850 s.modifyRx = f 851 s.modifyRxMu.Unlock() 852 s.lg.Info( 853 "modifying rx", 854 zap.String("from", s.To()), 855 zap.String("to", s.From()), 856 ) 857} 858 859func (s *server) UnmodifyRx() { 860 s.modifyRxMu.Lock() 861 s.modifyRx = nil 862 s.modifyRxMu.Unlock() 863 864 s.lg.Info( 865 "unmodifyed rx", 866 zap.String("from", s.To()), 867 zap.String("to", s.From()), 868 ) 869} 870 871func (s *server) BlackholeTx() { 872 s.ModifyTx(func([]byte) []byte { return nil }) 873 s.lg.Info( 874 "blackholed tx", 875 zap.String("from", s.From()), 876 zap.String("to", s.To()), 877 ) 878} 879 880func (s *server) UnblackholeTx() { 881 s.UnmodifyTx() 882 s.lg.Info( 883 "unblackholed tx", 884 zap.String("from", s.From()), 885 zap.String("to", s.To()), 886 ) 887} 888 889func (s *server) BlackholeRx() { 890 s.ModifyRx(func([]byte) []byte { return nil }) 891 s.lg.Info( 892 "blackholed rx", 893 zap.String("from", s.To()), 894 zap.String("to", s.From()), 895 ) 896} 897 898func (s *server) UnblackholeRx() { 899 s.UnmodifyRx() 900 s.lg.Info( 901 "unblackholed rx", 902 zap.String("from", s.To()), 903 zap.String("to", s.From()), 904 ) 905} 906 907func (s *server) PauseTx() { 908 s.pauseTxMu.Lock() 909 s.pauseTxc = make(chan struct{}) 910 s.pauseTxMu.Unlock() 911 912 s.lg.Info( 913 "paused tx", 914 zap.String("from", s.From()), 915 zap.String("to", s.To()), 916 ) 917} 918 919func (s *server) UnpauseTx() { 920 s.pauseTxMu.Lock() 921 select { 922 case <-s.pauseTxc: // already unpaused 923 case <-s.donec: 924 s.pauseTxMu.Unlock() 925 return 926 default: 927 close(s.pauseTxc) 928 } 929 s.pauseTxMu.Unlock() 930 931 s.lg.Info( 932 "unpaused tx", 933 zap.String("from", s.From()), 934 zap.String("to", s.To()), 935 ) 936} 937 938func (s *server) PauseRx() { 939 s.pauseRxMu.Lock() 940 s.pauseRxc = make(chan struct{}) 941 s.pauseRxMu.Unlock() 942 943 s.lg.Info( 944 "paused rx", 945 zap.String("from", s.To()), 946 zap.String("to", s.From()), 947 ) 948} 949 950func (s *server) UnpauseRx() { 951 s.pauseRxMu.Lock() 952 select { 953 case <-s.pauseRxc: // already unpaused 954 case <-s.donec: 955 s.pauseRxMu.Unlock() 956 return 957 default: 958 close(s.pauseRxc) 959 } 960 s.pauseRxMu.Unlock() 961 962 s.lg.Info( 963 "unpaused rx", 964 zap.String("from", s.To()), 965 zap.String("to", s.From()), 966 ) 967} 968 969func (s *server) ResetListener() error { 970 s.listenerMu.Lock() 971 defer s.listenerMu.Unlock() 972 973 if err := s.listener.Close(); err != nil { 974 // already closed 975 if !strings.HasSuffix(err.Error(), "use of closed network connection") { 976 return err 977 } 978 } 979 980 var ln net.Listener 981 var err error 982 if !s.tlsInfo.Empty() { 983 ln, err = transport.NewListener(s.from.Host, s.from.Scheme, &s.tlsInfo) 984 } else { 985 ln, err = net.Listen(s.from.Scheme, s.from.Host) 986 } 987 if err != nil { 988 return err 989 } 990 s.listener = ln 991 992 s.lg.Info( 993 "reset listener on", 994 zap.String("from", s.From()), 995 ) 996 return nil 997} 998