1package proxy 2 3import ( 4 "crypto/x509" 5 6 "github.com/hashicorp/consul/api" 7 "github.com/hashicorp/consul/connect" 8 "github.com/hashicorp/consul/lib" 9 "github.com/hashicorp/go-hclog" 10) 11 12// Proxy implements the built-in connect proxy. 13type Proxy struct { 14 client *api.Client 15 cfgWatcher ConfigWatcher 16 stopChan chan struct{} 17 logger hclog.Logger 18 service *connect.Service 19} 20 21// New returns a proxy with the given configuration source. 22// 23// The ConfigWatcher can be used to update the configuration of the proxy. 24// Whenever a new configuration is detected, the proxy will reconfigure itself. 25func New(client *api.Client, cw ConfigWatcher, logger hclog.Logger) (*Proxy, error) { 26 return &Proxy{ 27 client: client, 28 cfgWatcher: cw, 29 stopChan: make(chan struct{}), 30 logger: logger, 31 }, nil 32} 33 34// Serve the proxy instance until a fatal error occurs or proxy is closed. 35func (p *Proxy) Serve() error { 36 var cfg *Config 37 38 // failCh is used to stop Serve and return an error from another goroutine we 39 // spawn. 40 failCh := make(chan error, 1) 41 42 // Watch for config changes (initial setup happens on first "change") 43 for { 44 select { 45 case err := <-failCh: 46 // don't log here, we can log with better context at the point where we 47 // write the err to the chan 48 return err 49 50 case newCfg := <-p.cfgWatcher.Watch(): 51 p.logger.Debug("got new config") 52 53 if cfg == nil { 54 // Initial setup 55 56 // Setup telemetry if configured 57 _, err := lib.InitTelemetry(newCfg.Telemetry) 58 if err != nil { 59 p.logger.Error("proxy telemetry config error", "error", err) 60 } 61 62 // Setup Service instance now we know target ID etc 63 service, err := newCfg.Service(p.client, p.logger) 64 if err != nil { 65 return err 66 } 67 p.service = service 68 69 go func() { 70 <-service.ReadyWait() 71 p.logger.Info("Proxy loaded config and ready to serve") 72 tcfg := service.ServerTLSConfig() 73 cert, _ := tcfg.GetCertificate(nil) 74 leaf, _ := x509.ParseCertificate(cert.Certificate[0]) 75 roots, err := connect.CommonNamesFromCertPool(tcfg.RootCAs) 76 if err != nil { 77 p.logger.Error("Failed to parse root subjects", "error", err) 78 } else { 79 p.logger.Info("Parsed TLS identity", "uri", leaf.URIs[0], "roots", roots) 80 } 81 82 // Only start a listener if we have a port set. This allows 83 // the configuration to disable our public listener. 84 if newCfg.PublicListener.BindPort != 0 { 85 newCfg.PublicListener.applyDefaults() 86 l := NewPublicListener(p.service, newCfg.PublicListener, p.logger) 87 err = p.startListener("public listener", l) 88 if err != nil { 89 // This should probably be fatal. 90 p.logger.Error("failed to start public listener", "error", err) 91 failCh <- err 92 } 93 94 } 95 }() 96 } 97 98 // TODO(banks) update/remove upstreams properly based on a diff with current. Can 99 // store a map of uc.String() to Listener here and then use it to only 100 // start one of each and stop/modify if changes occur. 101 for _, uc := range newCfg.Upstreams { 102 uc.applyDefaults() 103 104 if uc.LocalBindPort < 1 { 105 p.logger.Error("upstream has no local_bind_port. "+ 106 "Can't start upstream.", "upstream", uc.String()) 107 continue 108 } 109 110 l := NewUpstreamListener(p.service, p.client, uc, p.logger) 111 err := p.startListener(uc.String(), l) 112 if err != nil { 113 p.logger.Error("failed to start upstream", 114 "upstream", uc.String(), 115 "error", err, 116 ) 117 } 118 } 119 cfg = newCfg 120 121 case <-p.stopChan: 122 return nil 123 } 124 } 125} 126 127// startPublicListener is run from the internal state machine loop 128func (p *Proxy) startListener(name string, l *Listener) error { 129 p.logger.Info("Starting listener", "listener", name, "bind_addr", l.BindAddr()) 130 go func() { 131 err := l.Serve() 132 if err != nil { 133 p.logger.Error("listener stopped with error", "listener", name, "error", err) 134 return 135 } 136 p.logger.Info("listener stopped", "listener", name) 137 }() 138 139 go func() { 140 <-p.stopChan 141 l.Close() 142 143 }() 144 145 return nil 146} 147 148// Close stops the proxy and terminates all active connections. It must be 149// called only once. 150func (p *Proxy) Close() { 151 close(p.stopChan) 152 if p.service != nil { 153 p.service.Close() 154 } 155} 156