1package watch
2
3import (
4	"log"
5	"sync"
6	"time"
7
8	dep "github.com/hashicorp/consul-template/dependency"
9	"github.com/pkg/errors"
10)
11
12// dataBufferSize is the default number of views to process in a batch.
13const dataBufferSize = 2048
14
15type RetryFunc func(int) (bool, time.Duration)
16
17// Watcher is a top-level manager for views that poll Consul for data.
18type Watcher struct {
19	sync.Mutex
20
21	// clients is the collection of API clients to talk to upstreams.
22	clients *dep.ClientSet
23
24	// dataCh is the chan where Views will be published.
25	dataCh chan *View
26
27	// errCh is the chan where any errors will be published.
28	errCh chan error
29
30	// depViewMap is a map of Templates to Views. Templates are keyed by
31	// their string.
32	depViewMap map[string]*View
33
34	// maxStale specifies the maximum staleness of a query response.
35	maxStale time.Duration
36
37	// once signals if this watcher should tell views to retrieve data exactly
38	// one time instead of polling infinitely.
39	once bool
40
41	// retryFuncs specifies the different ways to retry based on the upstream.
42	retryFuncConsul  RetryFunc
43	retryFuncDefault RetryFunc
44	retryFuncVault   RetryFunc
45
46	// vaultGrace is the grace period between a lease and the max TTL for which
47	// Consul Template will generate a new secret instead of renewing an existing
48	// one.
49	vaultGrace time.Duration
50}
51
52type NewWatcherInput struct {
53	// Clients is the client set to communicate with upstreams.
54	Clients *dep.ClientSet
55
56	// MaxStale is the maximum staleness of a query.
57	MaxStale time.Duration
58
59	// Once specifies this watcher should tell views to poll exactly once.
60	Once bool
61
62	// RenewVault indicates if this watcher should renew Vault tokens.
63	RenewVault bool
64
65	// VaultToken is the vault token to renew.
66	VaultToken string
67
68	// VaultAgentTokenFile is the path to Vault Agent token file
69	VaultAgentTokenFile string
70
71	// RetryFuncs specify the different ways to retry based on the upstream.
72	RetryFuncConsul  RetryFunc
73	RetryFuncDefault RetryFunc
74	RetryFuncVault   RetryFunc
75
76	// VaultGrace is the grace period between a lease and the max TTL for which
77	// Consul Template will generate a new secret instead of renewing an existing
78	// one.
79	VaultGrace time.Duration
80}
81
82// NewWatcher creates a new watcher using the given API client.
83func NewWatcher(i *NewWatcherInput) (*Watcher, error) {
84	w := &Watcher{
85		clients:          i.Clients,
86		depViewMap:       make(map[string]*View),
87		dataCh:           make(chan *View, dataBufferSize),
88		errCh:            make(chan error),
89		maxStale:         i.MaxStale,
90		once:             i.Once,
91		retryFuncConsul:  i.RetryFuncConsul,
92		retryFuncDefault: i.RetryFuncDefault,
93		retryFuncVault:   i.RetryFuncVault,
94		vaultGrace:       i.VaultGrace,
95	}
96
97	// Start a watcher for the Vault renew if that config was specified
98	if i.RenewVault {
99		vt, err := dep.NewVaultTokenQuery(i.VaultToken)
100		if err != nil {
101			return nil, errors.Wrap(err, "watcher")
102		}
103		if _, err := w.Add(vt); err != nil {
104			return nil, errors.Wrap(err, "watcher")
105		}
106	}
107
108	if len(i.VaultAgentTokenFile) > 0 {
109		vag, err := dep.NewVaultAgentTokenQuery(i.VaultAgentTokenFile)
110		if err != nil {
111			return nil, errors.Wrap(err, "watcher")
112		}
113		if _, err := w.Add(vag); err != nil {
114			return nil, errors.Wrap(err, "watcher")
115		}
116	}
117
118	return w, nil
119}
120
121// DataCh returns a read-only channel of Views which is populated when a view
122// receives data from its upstream.
123func (w *Watcher) DataCh() <-chan *View {
124	return w.dataCh
125}
126
127// ErrCh returns a read-only channel of errors returned by the upstream.
128func (w *Watcher) ErrCh() <-chan error {
129	return w.errCh
130}
131
132// Add adds the given dependency to the list of monitored dependencies
133// and start the associated view. If the dependency already exists, no action is
134// taken.
135//
136// If the Dependency already existed, it this function will return false. If the
137// view was successfully created, it will return true. If an error occurs while
138// creating the view, it will be returned here (but future errors returned by
139// the view will happen on the channel).
140func (w *Watcher) Add(d dep.Dependency) (bool, error) {
141	w.Lock()
142	defer w.Unlock()
143
144	log.Printf("[DEBUG] (watcher) adding %s", d)
145
146	if _, ok := w.depViewMap[d.String()]; ok {
147		log.Printf("[TRACE] (watcher) %s already exists, skipping", d)
148		return false, nil
149	}
150
151	// Choose the correct retry function based off of the dependency's type.
152	var retryFunc RetryFunc
153	switch d.Type() {
154	case dep.TypeConsul:
155		retryFunc = w.retryFuncConsul
156	case dep.TypeVault:
157		retryFunc = w.retryFuncVault
158	default:
159		retryFunc = w.retryFuncDefault
160	}
161
162	v, err := NewView(&NewViewInput{
163		Dependency: d,
164		Clients:    w.clients,
165		MaxStale:   w.maxStale,
166		Once:       w.once,
167		RetryFunc:  retryFunc,
168		VaultGrace: w.vaultGrace,
169	})
170	if err != nil {
171		return false, errors.Wrap(err, "watcher")
172	}
173
174	log.Printf("[TRACE] (watcher) %s starting", d)
175
176	w.depViewMap[d.String()] = v
177	go v.poll(w.dataCh, w.errCh)
178
179	return true, nil
180}
181
182// Watching determines if the given dependency is being watched.
183func (w *Watcher) Watching(d dep.Dependency) bool {
184	w.Lock()
185	defer w.Unlock()
186
187	_, ok := w.depViewMap[d.String()]
188	return ok
189}
190
191// ForceWatching is used to force setting the internal state of watching
192// a dependency. This is only used for unit testing purposes.
193func (w *Watcher) ForceWatching(d dep.Dependency, enabled bool) {
194	w.Lock()
195	defer w.Unlock()
196
197	if enabled {
198		w.depViewMap[d.String()] = nil
199	} else {
200		delete(w.depViewMap, d.String())
201	}
202}
203
204// Remove removes the given dependency from the list and stops the
205// associated View. If a View for the given dependency does not exist, this
206// function will return false. If the View does exist, this function will return
207// true upon successful deletion.
208func (w *Watcher) Remove(d dep.Dependency) bool {
209	w.Lock()
210	defer w.Unlock()
211
212	log.Printf("[DEBUG] (watcher) removing %s", d)
213
214	if view, ok := w.depViewMap[d.String()]; ok {
215		log.Printf("[TRACE] (watcher) actually removing %s", d)
216		view.stop()
217		delete(w.depViewMap, d.String())
218		return true
219	}
220
221	log.Printf("[TRACE] (watcher) %s did not exist, skipping", d)
222	return false
223}
224
225// Size returns the number of views this watcher is watching.
226func (w *Watcher) Size() int {
227	w.Lock()
228	defer w.Unlock()
229	return len(w.depViewMap)
230}
231
232// Stop halts this watcher and any currently polling views immediately. If a
233// view was in the middle of a poll, no data will be returned.
234func (w *Watcher) Stop() {
235	w.Lock()
236	defer w.Unlock()
237
238	log.Printf("[DEBUG] (watcher) stopping all views")
239
240	for _, view := range w.depViewMap {
241		if view == nil {
242			continue
243		}
244		log.Printf("[TRACE] (watcher) stopping %s", view.Dependency())
245		view.stop()
246	}
247
248	// Reset the map to have no views
249	w.depViewMap = make(map[string]*View)
250
251	// Close any idle TCP connections
252	w.clients.Stop()
253}
254