1package proxy
2
3import (
4	"crypto/x509"
5
6	"github.com/hashicorp/consul/api"
7	"github.com/hashicorp/consul/connect"
8	"github.com/hashicorp/consul/lib"
9	"github.com/hashicorp/go-hclog"
10)
11
12// Proxy implements the built-in connect proxy.
13type Proxy struct {
14	client     *api.Client
15	cfgWatcher ConfigWatcher
16	stopChan   chan struct{}
17	logger     hclog.Logger
18	service    *connect.Service
19}
20
21// New returns a proxy with the given configuration source.
22//
23// The ConfigWatcher can be used to update the configuration of the proxy.
24// Whenever a new configuration is detected, the proxy will reconfigure itself.
25func New(client *api.Client, cw ConfigWatcher, logger hclog.Logger) (*Proxy, error) {
26	return &Proxy{
27		client:     client,
28		cfgWatcher: cw,
29		stopChan:   make(chan struct{}),
30		logger:     logger,
31	}, nil
32}
33
34// Serve the proxy instance until a fatal error occurs or proxy is closed.
35func (p *Proxy) Serve() error {
36	var cfg *Config
37
38	// failCh is used to stop Serve and return an error from another goroutine we
39	// spawn.
40	failCh := make(chan error, 1)
41
42	// Watch for config changes (initial setup happens on first "change")
43	for {
44		select {
45		case err := <-failCh:
46			// don't log here, we can log with better context at the point where we
47			// write the err to the chan
48			return err
49
50		case newCfg := <-p.cfgWatcher.Watch():
51			p.logger.Debug("got new config")
52
53			if cfg == nil {
54				// Initial setup
55
56				// Setup telemetry if configured
57				_, err := lib.InitTelemetry(newCfg.Telemetry)
58				if err != nil {
59					p.logger.Error("proxy telemetry config error", "error", err)
60				}
61
62				// Setup Service instance now we know target ID etc
63				service, err := newCfg.Service(p.client, p.logger)
64				if err != nil {
65					return err
66				}
67				p.service = service
68
69				go func() {
70					<-service.ReadyWait()
71					p.logger.Info("Proxy loaded config and ready to serve")
72					tcfg := service.ServerTLSConfig()
73					cert, _ := tcfg.GetCertificate(nil)
74					leaf, _ := x509.ParseCertificate(cert.Certificate[0])
75					roots, err := connect.CommonNamesFromCertPool(tcfg.RootCAs)
76					if err != nil {
77						p.logger.Error("Failed to parse root subjects", "error", err)
78					} else {
79						p.logger.Info("Parsed TLS identity", "uri", leaf.URIs[0], "roots", roots)
80					}
81
82					// Only start a listener if we have a port set. This allows
83					// the configuration to disable our public listener.
84					if newCfg.PublicListener.BindPort != 0 {
85						newCfg.PublicListener.applyDefaults()
86						l := NewPublicListener(p.service, newCfg.PublicListener, p.logger)
87						err = p.startListener("public listener", l)
88						if err != nil {
89							// This should probably be fatal.
90							p.logger.Error("failed to start public listener", "error", err)
91							failCh <- err
92						}
93
94					}
95				}()
96			}
97
98			// TODO(banks) update/remove upstreams properly based on a diff with current. Can
99			// store a map of uc.String() to Listener here and then use it to only
100			// start one of each and stop/modify if changes occur.
101			for _, uc := range newCfg.Upstreams {
102				uc.applyDefaults()
103
104				if uc.LocalBindPort < 1 {
105					p.logger.Error("upstream has no local_bind_port. "+
106						"Can't start upstream.", "upstream", uc.String())
107					continue
108				}
109
110				l := NewUpstreamListener(p.service, p.client, uc, p.logger)
111				err := p.startListener(uc.String(), l)
112				if err != nil {
113					p.logger.Error("failed to start upstream",
114						"upstream", uc.String(),
115						"error", err,
116					)
117				}
118			}
119			cfg = newCfg
120
121		case <-p.stopChan:
122			return nil
123		}
124	}
125}
126
127// startPublicListener is run from the internal state machine loop
128func (p *Proxy) startListener(name string, l *Listener) error {
129	p.logger.Info("Starting listener", "listener", name, "bind_addr", l.BindAddr())
130	go func() {
131		err := l.Serve()
132		if err != nil {
133			p.logger.Error("listener stopped with error", "listener", name, "error", err)
134			return
135		}
136		p.logger.Info("listener stopped", "listener", name)
137	}()
138
139	go func() {
140		<-p.stopChan
141		l.Close()
142
143	}()
144
145	return nil
146}
147
148// Close stops the proxy and terminates all active connections. It must be
149// called only once.
150func (p *Proxy) Close() {
151	close(p.stopChan)
152	if p.service != nil {
153		p.service.Close()
154	}
155}
156