1package manager
2
3import (
4	"bytes"
5	"compress/lzw"
6	"encoding/gob"
7	"fmt"
8	"log"
9	"path"
10	"sync"
11	"time"
12
13	"github.com/mitchellh/hashstructure"
14
15	"github.com/hashicorp/consul-template/config"
16	dep "github.com/hashicorp/consul-template/dependency"
17	"github.com/hashicorp/consul-template/template"
18	"github.com/hashicorp/consul-template/version"
19	consulapi "github.com/hashicorp/consul/api"
20)
21
22var (
23	// sessionCreateRetry is the amount of time we wait
24	// to recreate a session when lost.
25	sessionCreateRetry = 15 * time.Second
26
27	// lockRetry is the interval on which we try to re-acquire locks
28	lockRetry = 10 * time.Second
29
30	// listRetry is the interval on which we retry listing a data path
31	listRetry = 10 * time.Second
32
33	// timeout passed through to consul api client Lock
34	// here to override in testing (see ./dedup_test.go)
35	lockWaitTime = 15 * time.Second
36)
37
38const (
39	templateNoDataStr = "__NO_DATA__"
40)
41
42// templateData is GOB encoded share the dependency values
43type templateData struct {
44	// Version is the version of Consul Template which created this template data.
45	// This is important because users may be running multiple versions of CT
46	// with the same templates. This provides a nicer upgrade path.
47	Version string
48
49	// Data is the actual template data.
50	Data map[string]interface{}
51}
52
53func templateNoData() []byte {
54	return []byte(templateNoDataStr)
55}
56
57// DedupManager is used to de-duplicate which instance of Consul-Template
58// is handling each template. For each template, a lock path is determined
59// using the MD5 of the template. This path is used to elect a "leader"
60// instance.
61//
62// The leader instance operations like usual, but any time a template is
63// rendered, any of the data required for rendering is stored in the
64// Consul KV store under the lock path.
65//
66// The follower instances depend on the leader to do the primary watching
67// and rendering, and instead only watch the aggregated data in the KV.
68// Followers wait for updates and re-render the template.
69//
70// If a template depends on 50 views, and is running on 50 machines, that
71// would normally require 2500 blocking queries. Using deduplication, one
72// instance has 50 view queries, plus 50 additional queries on the lock
73// path for a total of 100.
74//
75type DedupManager struct {
76	// config is the deduplicate configuration
77	config *config.DedupConfig
78
79	// clients is used to access the underlying clients
80	clients *dep.ClientSet
81
82	// Brain is where we inject updates
83	brain *template.Brain
84
85	// templates is the set of templates we are trying to dedup
86	templates []*template.Template
87
88	// leader tracks if we are currently the leader
89	leader     map[*template.Template]<-chan struct{}
90	leaderLock sync.RWMutex
91
92	// lastWrite tracks the hash of the data paths
93	lastWrite     map[*template.Template]uint64
94	lastWriteLock sync.RWMutex
95
96	// updateCh is used to indicate an update watched data
97	updateCh chan struct{}
98
99	// wg is used to wait for a clean shutdown
100	wg sync.WaitGroup
101
102	stop     bool
103	stopCh   chan struct{}
104	stopLock sync.Mutex
105}
106
107// NewDedupManager creates a new Dedup manager
108func NewDedupManager(config *config.DedupConfig, clients *dep.ClientSet, brain *template.Brain, templates []*template.Template) (*DedupManager, error) {
109	d := &DedupManager{
110		config:    config,
111		clients:   clients,
112		brain:     brain,
113		templates: templates,
114		leader:    make(map[*template.Template]<-chan struct{}),
115		lastWrite: make(map[*template.Template]uint64),
116		updateCh:  make(chan struct{}, 1),
117		stopCh:    make(chan struct{}),
118	}
119	return d, nil
120}
121
122// Start is used to start the de-duplication manager
123func (d *DedupManager) Start() error {
124	log.Printf("[INFO] (dedup) starting de-duplication manager")
125
126	client := d.clients.Consul()
127	go d.createSession(client)
128
129	// Start to watch each template
130	for _, t := range d.templates {
131		go d.watchTemplate(client, t)
132	}
133	return nil
134}
135
136// Stop is used to stop the de-duplication manager
137func (d *DedupManager) Stop() error {
138	d.stopLock.Lock()
139	defer d.stopLock.Unlock()
140	if d.stop {
141		return nil
142	}
143
144	log.Printf("[INFO] (dedup) stopping de-duplication manager")
145	d.stop = true
146	close(d.stopCh)
147	d.wg.Wait()
148	return nil
149}
150
151// createSession is used to create and maintain a session to Consul
152func (d *DedupManager) createSession(client *consulapi.Client) {
153START:
154	log.Printf("[INFO] (dedup) attempting to create session")
155	session := client.Session()
156	sessionCh := make(chan struct{})
157	ttl := fmt.Sprintf("%.6fs", float64(*d.config.TTL)/float64(time.Second))
158	se := &consulapi.SessionEntry{
159		Name:      "Consul-Template de-duplication",
160		Behavior:  "delete",
161		TTL:       ttl,
162		LockDelay: 1 * time.Millisecond,
163	}
164	id, _, err := session.Create(se, nil)
165	if err != nil {
166		log.Printf("[ERR] (dedup) failed to create session: %v", err)
167		goto WAIT
168	}
169	log.Printf("[INFO] (dedup) created session %s", id)
170
171	// Attempt to lock each template
172	for _, t := range d.templates {
173		d.wg.Add(1)
174		go d.attemptLock(client, id, sessionCh, t)
175	}
176
177	// Renew our session periodically
178	if err := session.RenewPeriodic("15s", id, nil, d.stopCh); err != nil {
179		log.Printf("[ERR] (dedup) failed to renew session: %v", err)
180	}
181	close(sessionCh)
182	d.wg.Wait()
183
184WAIT:
185	select {
186	case <-time.After(sessionCreateRetry):
187		goto START
188	case <-d.stopCh:
189		return
190	}
191}
192
193// IsLeader checks if we are currently the leader instance
194func (d *DedupManager) IsLeader(tmpl *template.Template) bool {
195	d.leaderLock.RLock()
196	defer d.leaderLock.RUnlock()
197
198	lockCh, ok := d.leader[tmpl]
199	if !ok {
200		return false
201	}
202	select {
203	case <-lockCh:
204		return false
205	default:
206		return true
207	}
208}
209
210// UpdateDeps is used to update the values of the dependencies for a template
211func (d *DedupManager) UpdateDeps(t *template.Template, deps []dep.Dependency) error {
212	// Calculate the path to write updates to
213	dataPath := path.Join(*d.config.Prefix, t.ID(), "data")
214
215	// Package up the dependency data
216	td := templateData{
217		Version: version.Version,
218		Data:    make(map[string]interface{}),
219	}
220	for _, dp := range deps {
221		// Skip any dependencies that can't be shared
222		if !dp.CanShare() {
223			continue
224		}
225
226		// Pull the current value from the brain
227		val, ok := d.brain.Recall(dp)
228		if ok {
229			td.Data[dp.String()] = val
230		}
231	}
232
233	// Compute stable hash of the data. Note we don't compute this over the actual
234	// encoded value since gob encoding does not guarantee stable ordering for
235	// maps so spuriously returns a different hash most times. See
236	// https://github.com/hashicorp/consul-template/issues/1099.
237	hash, err := hashstructure.Hash(td, nil)
238	if err != nil {
239		return fmt.Errorf("calculating hash failed: %v", err)
240	}
241	d.lastWriteLock.RLock()
242	existing, ok := d.lastWrite[t]
243	d.lastWriteLock.RUnlock()
244	if ok && existing == hash {
245		log.Printf("[INFO] (dedup) de-duplicate data '%s' already current",
246			dataPath)
247		return nil
248	}
249
250	// Encode via GOB and LZW compress
251	var buf bytes.Buffer
252	compress := lzw.NewWriter(&buf, lzw.LSB, 8)
253	enc := gob.NewEncoder(compress)
254	if err := enc.Encode(&td); err != nil {
255		return fmt.Errorf("encode failed: %v", err)
256	}
257	compress.Close()
258
259	// Write the KV update
260	kvPair := consulapi.KVPair{
261		Key:   dataPath,
262		Value: buf.Bytes(),
263		Flags: consulapi.LockFlagValue,
264	}
265	client := d.clients.Consul()
266	if _, err := client.KV().Put(&kvPair, nil); err != nil {
267		return fmt.Errorf("failed to write '%s': %v", dataPath, err)
268	}
269	log.Printf("[INFO] (dedup) updated de-duplicate data '%s'", dataPath)
270	d.lastWriteLock.Lock()
271	d.lastWrite[t] = hash
272	d.lastWriteLock.Unlock()
273	return nil
274}
275
276// UpdateCh returns a channel to watch for dependency updates
277func (d *DedupManager) UpdateCh() <-chan struct{} {
278	return d.updateCh
279}
280
281// setLeader sets if we are currently the leader instance
282func (d *DedupManager) setLeader(tmpl *template.Template, lockCh <-chan struct{}) {
283	// Update the lock state
284	d.leaderLock.Lock()
285	if lockCh != nil {
286		d.leader[tmpl] = lockCh
287	} else {
288		delete(d.leader, tmpl)
289	}
290	d.leaderLock.Unlock()
291
292	// Clear the lastWrite hash if we've lost leadership
293	if lockCh == nil {
294		d.lastWriteLock.Lock()
295		delete(d.lastWrite, tmpl)
296		d.lastWriteLock.Unlock()
297	}
298
299	// Do an async notify of an update
300	select {
301	case d.updateCh <- struct{}{}:
302	default:
303	}
304}
305
306func (d *DedupManager) watchTemplate(client *consulapi.Client, t *template.Template) {
307	log.Printf("[INFO] (dedup) starting watch for template hash %s", t.ID())
308	path := path.Join(*d.config.Prefix, t.ID(), "data")
309
310	// Determine if stale queries are allowed
311	var allowStale bool
312	if *d.config.MaxStale != 0 {
313		allowStale = true
314	}
315
316	// Setup our query options
317	opts := &consulapi.QueryOptions{
318		AllowStale: allowStale,
319		WaitTime:   60 * time.Second,
320	}
321
322	var lastData []byte
323	var lastIndex uint64
324
325START:
326	// Stop listening if we're stopped
327	select {
328	case <-d.stopCh:
329		return
330	default:
331	}
332
333	// If we are current the leader, wait for leadership lost
334	d.leaderLock.RLock()
335	lockCh, ok := d.leader[t]
336	d.leaderLock.RUnlock()
337	if ok {
338		select {
339		case <-lockCh:
340			goto START
341		case <-d.stopCh:
342			return
343		}
344	}
345
346	// Block for updates on the data key
347	log.Printf("[INFO] (dedup) listing data for template hash %s", t.ID())
348	pair, meta, err := client.KV().Get(path, opts)
349	if err != nil {
350		log.Printf("[ERR] (dedup) failed to get '%s': %v", path, err)
351		select {
352		case <-time.After(listRetry):
353			goto START
354		case <-d.stopCh:
355			return
356		}
357	}
358	opts.WaitIndex = meta.LastIndex
359
360	// Stop listening if we're stopped
361	select {
362	case <-d.stopCh:
363		return
364	default:
365	}
366
367	// If we've exceeded the maximum staleness, retry without stale
368	if allowStale && meta.LastContact > *d.config.MaxStale {
369		allowStale = false
370		log.Printf("[DEBUG] (dedup) %s stale data (last contact exceeded max_stale)", path)
371		goto START
372	}
373
374	// Re-enable stale queries if allowed
375	if *d.config.MaxStale > 0 {
376		allowStale = true
377	}
378
379	if meta.LastIndex == lastIndex {
380		log.Printf("[TRACE] (dedup) %s no new data (index was the same)", path)
381		goto START
382	}
383
384	if meta.LastIndex < lastIndex {
385		log.Printf("[TRACE] (dedup) %s had a lower index, resetting", path)
386		lastIndex = 0
387		goto START
388	}
389	lastIndex = meta.LastIndex
390
391	var data []byte
392	if pair != nil {
393		data = pair.Value
394	}
395	if bytes.Equal(lastData, data) {
396		log.Printf("[TRACE] (dedup) %s no new data (contents were the same)", path)
397		goto START
398	}
399	lastData = data
400
401	// If we are current the leader, wait for leadership lost
402	d.leaderLock.RLock()
403	lockCh, ok = d.leader[t]
404	d.leaderLock.RUnlock()
405	if ok {
406		select {
407		case <-lockCh:
408			goto START
409		case <-d.stopCh:
410			return
411		}
412	}
413
414	// Parse the data file
415	if pair != nil && pair.Flags == consulapi.LockFlagValue && !bytes.Equal(pair.Value, templateNoData()) {
416		d.parseData(pair.Key, pair.Value)
417	}
418	goto START
419}
420
421// parseData is used to update brain from a KV data pair
422func (d *DedupManager) parseData(path string, raw []byte) {
423	// Setup the decompression and decoders
424	r := bytes.NewReader(raw)
425	decompress := lzw.NewReader(r, lzw.LSB, 8)
426	defer decompress.Close()
427	dec := gob.NewDecoder(decompress)
428
429	// Decode the data
430	var td templateData
431	if err := dec.Decode(&td); err != nil {
432		log.Printf("[ERR] (dedup) failed to decode '%s': %v",
433			path, err)
434		return
435	}
436	if td.Version != version.Version {
437		log.Printf("[WARN] (dedup) created with different version (%s vs %s)",
438			td.Version, version.Version)
439		return
440	}
441	log.Printf("[INFO] (dedup) loading %d dependencies from '%s'",
442		len(td.Data), path)
443
444	// Update the data in the brain
445	for hashCode, value := range td.Data {
446		d.brain.ForceSet(hashCode, value)
447	}
448
449	// Trigger the updateCh
450	select {
451	case d.updateCh <- struct{}{}:
452	default:
453	}
454}
455
456func (d *DedupManager) attemptLock(client *consulapi.Client, session string, sessionCh chan struct{}, t *template.Template) {
457	defer d.wg.Done()
458	for {
459		log.Printf("[INFO] (dedup) attempting lock for template hash %s", t.ID())
460		basePath := path.Join(*d.config.Prefix, t.ID())
461		lopts := &consulapi.LockOptions{
462			Key:              path.Join(basePath, "data"),
463			Value:            templateNoData(),
464			Session:          session,
465			MonitorRetries:   3,
466			MonitorRetryTime: 3 * time.Second,
467			LockWaitTime:     lockWaitTime,
468		}
469		lock, err := client.LockOpts(lopts)
470		if err != nil {
471			log.Printf("[ERR] (dedup) failed to create lock '%s': %v",
472				lopts.Key, err)
473			return
474		}
475
476		var retryCh <-chan time.Time
477		leaderCh, err := lock.Lock(sessionCh)
478		if err != nil {
479			log.Printf("[ERR] (dedup) failed to acquire lock '%s': %v",
480				lopts.Key, err)
481			retryCh = time.After(lockRetry)
482		} else {
483			log.Printf("[INFO] (dedup) acquired lock '%s'", lopts.Key)
484			d.setLeader(t, leaderCh)
485		}
486
487		select {
488		case <-retryCh:
489			retryCh = nil
490			continue
491		case <-leaderCh:
492			log.Printf("[WARN] (dedup) lost lock ownership '%s'", lopts.Key)
493			d.setLeader(t, nil)
494			continue
495		case <-sessionCh:
496			log.Printf("[INFO] (dedup) releasing session '%s'", lopts.Key)
497			d.setLeader(t, nil)
498			_, err = client.Session().Destroy(session, nil)
499			if err != nil {
500				log.Printf("[ERROR] (dedup) failed destroying session '%s', %s", session, err)
501			}
502			return
503		case <-d.stopCh:
504			log.Printf("[INFO] (dedup) releasing lock '%s'", lopts.Key)
505			_, err = client.Session().Destroy(session, nil)
506			if err != nil {
507				log.Printf("[ERROR] (dedup) failed destroying session '%s', %s", session, err)
508			}
509			return
510		}
511	}
512}
513