1package manager 2 3import ( 4 "bytes" 5 "compress/lzw" 6 "encoding/gob" 7 "fmt" 8 "log" 9 "path" 10 "sync" 11 "time" 12 13 "github.com/mitchellh/hashstructure" 14 15 "github.com/hashicorp/consul-template/config" 16 dep "github.com/hashicorp/consul-template/dependency" 17 "github.com/hashicorp/consul-template/template" 18 "github.com/hashicorp/consul-template/version" 19 consulapi "github.com/hashicorp/consul/api" 20) 21 22var ( 23 // sessionCreateRetry is the amount of time we wait 24 // to recreate a session when lost. 25 sessionCreateRetry = 15 * time.Second 26 27 // lockRetry is the interval on which we try to re-acquire locks 28 lockRetry = 10 * time.Second 29 30 // listRetry is the interval on which we retry listing a data path 31 listRetry = 10 * time.Second 32 33 // timeout passed through to consul api client Lock 34 // here to override in testing (see ./dedup_test.go) 35 lockWaitTime = 15 * time.Second 36) 37 38const ( 39 templateNoDataStr = "__NO_DATA__" 40) 41 42// templateData is GOB encoded share the dependency values 43type templateData struct { 44 // Version is the version of Consul Template which created this template data. 45 // This is important because users may be running multiple versions of CT 46 // with the same templates. This provides a nicer upgrade path. 47 Version string 48 49 // Data is the actual template data. 50 Data map[string]interface{} 51} 52 53func templateNoData() []byte { 54 return []byte(templateNoDataStr) 55} 56 57// DedupManager is used to de-duplicate which instance of Consul-Template 58// is handling each template. For each template, a lock path is determined 59// using the MD5 of the template. This path is used to elect a "leader" 60// instance. 61// 62// The leader instance operations like usual, but any time a template is 63// rendered, any of the data required for rendering is stored in the 64// Consul KV store under the lock path. 65// 66// The follower instances depend on the leader to do the primary watching 67// and rendering, and instead only watch the aggregated data in the KV. 68// Followers wait for updates and re-render the template. 69// 70// If a template depends on 50 views, and is running on 50 machines, that 71// would normally require 2500 blocking queries. Using deduplication, one 72// instance has 50 view queries, plus 50 additional queries on the lock 73// path for a total of 100. 74// 75type DedupManager struct { 76 // config is the deduplicate configuration 77 config *config.DedupConfig 78 79 // clients is used to access the underlying clients 80 clients *dep.ClientSet 81 82 // Brain is where we inject updates 83 brain *template.Brain 84 85 // templates is the set of templates we are trying to dedup 86 templates []*template.Template 87 88 // leader tracks if we are currently the leader 89 leader map[*template.Template]<-chan struct{} 90 leaderLock sync.RWMutex 91 92 // lastWrite tracks the hash of the data paths 93 lastWrite map[*template.Template]uint64 94 lastWriteLock sync.RWMutex 95 96 // updateCh is used to indicate an update watched data 97 updateCh chan struct{} 98 99 // wg is used to wait for a clean shutdown 100 wg sync.WaitGroup 101 102 stop bool 103 stopCh chan struct{} 104 stopLock sync.Mutex 105} 106 107// NewDedupManager creates a new Dedup manager 108func NewDedupManager(config *config.DedupConfig, clients *dep.ClientSet, brain *template.Brain, templates []*template.Template) (*DedupManager, error) { 109 d := &DedupManager{ 110 config: config, 111 clients: clients, 112 brain: brain, 113 templates: templates, 114 leader: make(map[*template.Template]<-chan struct{}), 115 lastWrite: make(map[*template.Template]uint64), 116 updateCh: make(chan struct{}, 1), 117 stopCh: make(chan struct{}), 118 } 119 return d, nil 120} 121 122// Start is used to start the de-duplication manager 123func (d *DedupManager) Start() error { 124 log.Printf("[INFO] (dedup) starting de-duplication manager") 125 126 client := d.clients.Consul() 127 go d.createSession(client) 128 129 // Start to watch each template 130 for _, t := range d.templates { 131 go d.watchTemplate(client, t) 132 } 133 return nil 134} 135 136// Stop is used to stop the de-duplication manager 137func (d *DedupManager) Stop() error { 138 d.stopLock.Lock() 139 defer d.stopLock.Unlock() 140 if d.stop { 141 return nil 142 } 143 144 log.Printf("[INFO] (dedup) stopping de-duplication manager") 145 d.stop = true 146 close(d.stopCh) 147 d.wg.Wait() 148 return nil 149} 150 151// createSession is used to create and maintain a session to Consul 152func (d *DedupManager) createSession(client *consulapi.Client) { 153START: 154 log.Printf("[INFO] (dedup) attempting to create session") 155 session := client.Session() 156 sessionCh := make(chan struct{}) 157 ttl := fmt.Sprintf("%.6fs", float64(*d.config.TTL)/float64(time.Second)) 158 se := &consulapi.SessionEntry{ 159 Name: "Consul-Template de-duplication", 160 Behavior: "delete", 161 TTL: ttl, 162 LockDelay: 1 * time.Millisecond, 163 } 164 id, _, err := session.Create(se, nil) 165 if err != nil { 166 log.Printf("[ERR] (dedup) failed to create session: %v", err) 167 goto WAIT 168 } 169 log.Printf("[INFO] (dedup) created session %s", id) 170 171 // Attempt to lock each template 172 for _, t := range d.templates { 173 d.wg.Add(1) 174 go d.attemptLock(client, id, sessionCh, t) 175 } 176 177 // Renew our session periodically 178 if err := session.RenewPeriodic("15s", id, nil, d.stopCh); err != nil { 179 log.Printf("[ERR] (dedup) failed to renew session: %v", err) 180 } 181 close(sessionCh) 182 d.wg.Wait() 183 184WAIT: 185 select { 186 case <-time.After(sessionCreateRetry): 187 goto START 188 case <-d.stopCh: 189 return 190 } 191} 192 193// IsLeader checks if we are currently the leader instance 194func (d *DedupManager) IsLeader(tmpl *template.Template) bool { 195 d.leaderLock.RLock() 196 defer d.leaderLock.RUnlock() 197 198 lockCh, ok := d.leader[tmpl] 199 if !ok { 200 return false 201 } 202 select { 203 case <-lockCh: 204 return false 205 default: 206 return true 207 } 208} 209 210// UpdateDeps is used to update the values of the dependencies for a template 211func (d *DedupManager) UpdateDeps(t *template.Template, deps []dep.Dependency) error { 212 // Calculate the path to write updates to 213 dataPath := path.Join(*d.config.Prefix, t.ID(), "data") 214 215 // Package up the dependency data 216 td := templateData{ 217 Version: version.Version, 218 Data: make(map[string]interface{}), 219 } 220 for _, dp := range deps { 221 // Skip any dependencies that can't be shared 222 if !dp.CanShare() { 223 continue 224 } 225 226 // Pull the current value from the brain 227 val, ok := d.brain.Recall(dp) 228 if ok { 229 td.Data[dp.String()] = val 230 } 231 } 232 233 // Compute stable hash of the data. Note we don't compute this over the actual 234 // encoded value since gob encoding does not guarantee stable ordering for 235 // maps so spuriously returns a different hash most times. See 236 // https://github.com/hashicorp/consul-template/issues/1099. 237 hash, err := hashstructure.Hash(td, nil) 238 if err != nil { 239 return fmt.Errorf("calculating hash failed: %v", err) 240 } 241 d.lastWriteLock.RLock() 242 existing, ok := d.lastWrite[t] 243 d.lastWriteLock.RUnlock() 244 if ok && existing == hash { 245 log.Printf("[INFO] (dedup) de-duplicate data '%s' already current", 246 dataPath) 247 return nil 248 } 249 250 // Encode via GOB and LZW compress 251 var buf bytes.Buffer 252 compress := lzw.NewWriter(&buf, lzw.LSB, 8) 253 enc := gob.NewEncoder(compress) 254 if err := enc.Encode(&td); err != nil { 255 return fmt.Errorf("encode failed: %v", err) 256 } 257 compress.Close() 258 259 // Write the KV update 260 kvPair := consulapi.KVPair{ 261 Key: dataPath, 262 Value: buf.Bytes(), 263 Flags: consulapi.LockFlagValue, 264 } 265 client := d.clients.Consul() 266 if _, err := client.KV().Put(&kvPair, nil); err != nil { 267 return fmt.Errorf("failed to write '%s': %v", dataPath, err) 268 } 269 log.Printf("[INFO] (dedup) updated de-duplicate data '%s'", dataPath) 270 d.lastWriteLock.Lock() 271 d.lastWrite[t] = hash 272 d.lastWriteLock.Unlock() 273 return nil 274} 275 276// UpdateCh returns a channel to watch for dependency updates 277func (d *DedupManager) UpdateCh() <-chan struct{} { 278 return d.updateCh 279} 280 281// setLeader sets if we are currently the leader instance 282func (d *DedupManager) setLeader(tmpl *template.Template, lockCh <-chan struct{}) { 283 // Update the lock state 284 d.leaderLock.Lock() 285 if lockCh != nil { 286 d.leader[tmpl] = lockCh 287 } else { 288 delete(d.leader, tmpl) 289 } 290 d.leaderLock.Unlock() 291 292 // Clear the lastWrite hash if we've lost leadership 293 if lockCh == nil { 294 d.lastWriteLock.Lock() 295 delete(d.lastWrite, tmpl) 296 d.lastWriteLock.Unlock() 297 } 298 299 // Do an async notify of an update 300 select { 301 case d.updateCh <- struct{}{}: 302 default: 303 } 304} 305 306func (d *DedupManager) watchTemplate(client *consulapi.Client, t *template.Template) { 307 log.Printf("[INFO] (dedup) starting watch for template hash %s", t.ID()) 308 path := path.Join(*d.config.Prefix, t.ID(), "data") 309 310 // Determine if stale queries are allowed 311 var allowStale bool 312 if *d.config.MaxStale != 0 { 313 allowStale = true 314 } 315 316 // Setup our query options 317 opts := &consulapi.QueryOptions{ 318 AllowStale: allowStale, 319 WaitTime: 60 * time.Second, 320 } 321 322 var lastData []byte 323 var lastIndex uint64 324 325START: 326 // Stop listening if we're stopped 327 select { 328 case <-d.stopCh: 329 return 330 default: 331 } 332 333 // If we are current the leader, wait for leadership lost 334 d.leaderLock.RLock() 335 lockCh, ok := d.leader[t] 336 d.leaderLock.RUnlock() 337 if ok { 338 select { 339 case <-lockCh: 340 goto START 341 case <-d.stopCh: 342 return 343 } 344 } 345 346 // Block for updates on the data key 347 log.Printf("[INFO] (dedup) listing data for template hash %s", t.ID()) 348 pair, meta, err := client.KV().Get(path, opts) 349 if err != nil { 350 log.Printf("[ERR] (dedup) failed to get '%s': %v", path, err) 351 select { 352 case <-time.After(listRetry): 353 goto START 354 case <-d.stopCh: 355 return 356 } 357 } 358 opts.WaitIndex = meta.LastIndex 359 360 // Stop listening if we're stopped 361 select { 362 case <-d.stopCh: 363 return 364 default: 365 } 366 367 // If we've exceeded the maximum staleness, retry without stale 368 if allowStale && meta.LastContact > *d.config.MaxStale { 369 allowStale = false 370 log.Printf("[DEBUG] (dedup) %s stale data (last contact exceeded max_stale)", path) 371 goto START 372 } 373 374 // Re-enable stale queries if allowed 375 if *d.config.MaxStale > 0 { 376 allowStale = true 377 } 378 379 if meta.LastIndex == lastIndex { 380 log.Printf("[TRACE] (dedup) %s no new data (index was the same)", path) 381 goto START 382 } 383 384 if meta.LastIndex < lastIndex { 385 log.Printf("[TRACE] (dedup) %s had a lower index, resetting", path) 386 lastIndex = 0 387 goto START 388 } 389 lastIndex = meta.LastIndex 390 391 var data []byte 392 if pair != nil { 393 data = pair.Value 394 } 395 if bytes.Equal(lastData, data) { 396 log.Printf("[TRACE] (dedup) %s no new data (contents were the same)", path) 397 goto START 398 } 399 lastData = data 400 401 // If we are current the leader, wait for leadership lost 402 d.leaderLock.RLock() 403 lockCh, ok = d.leader[t] 404 d.leaderLock.RUnlock() 405 if ok { 406 select { 407 case <-lockCh: 408 goto START 409 case <-d.stopCh: 410 return 411 } 412 } 413 414 // Parse the data file 415 if pair != nil && pair.Flags == consulapi.LockFlagValue && !bytes.Equal(pair.Value, templateNoData()) { 416 d.parseData(pair.Key, pair.Value) 417 } 418 goto START 419} 420 421// parseData is used to update brain from a KV data pair 422func (d *DedupManager) parseData(path string, raw []byte) { 423 // Setup the decompression and decoders 424 r := bytes.NewReader(raw) 425 decompress := lzw.NewReader(r, lzw.LSB, 8) 426 defer decompress.Close() 427 dec := gob.NewDecoder(decompress) 428 429 // Decode the data 430 var td templateData 431 if err := dec.Decode(&td); err != nil { 432 log.Printf("[ERR] (dedup) failed to decode '%s': %v", 433 path, err) 434 return 435 } 436 if td.Version != version.Version { 437 log.Printf("[WARN] (dedup) created with different version (%s vs %s)", 438 td.Version, version.Version) 439 return 440 } 441 log.Printf("[INFO] (dedup) loading %d dependencies from '%s'", 442 len(td.Data), path) 443 444 // Update the data in the brain 445 for hashCode, value := range td.Data { 446 d.brain.ForceSet(hashCode, value) 447 } 448 449 // Trigger the updateCh 450 select { 451 case d.updateCh <- struct{}{}: 452 default: 453 } 454} 455 456func (d *DedupManager) attemptLock(client *consulapi.Client, session string, sessionCh chan struct{}, t *template.Template) { 457 defer d.wg.Done() 458 for { 459 log.Printf("[INFO] (dedup) attempting lock for template hash %s", t.ID()) 460 basePath := path.Join(*d.config.Prefix, t.ID()) 461 lopts := &consulapi.LockOptions{ 462 Key: path.Join(basePath, "data"), 463 Value: templateNoData(), 464 Session: session, 465 MonitorRetries: 3, 466 MonitorRetryTime: 3 * time.Second, 467 LockWaitTime: lockWaitTime, 468 } 469 lock, err := client.LockOpts(lopts) 470 if err != nil { 471 log.Printf("[ERR] (dedup) failed to create lock '%s': %v", 472 lopts.Key, err) 473 return 474 } 475 476 var retryCh <-chan time.Time 477 leaderCh, err := lock.Lock(sessionCh) 478 if err != nil { 479 log.Printf("[ERR] (dedup) failed to acquire lock '%s': %v", 480 lopts.Key, err) 481 retryCh = time.After(lockRetry) 482 } else { 483 log.Printf("[INFO] (dedup) acquired lock '%s'", lopts.Key) 484 d.setLeader(t, leaderCh) 485 } 486 487 select { 488 case <-retryCh: 489 retryCh = nil 490 continue 491 case <-leaderCh: 492 log.Printf("[WARN] (dedup) lost lock ownership '%s'", lopts.Key) 493 d.setLeader(t, nil) 494 continue 495 case <-sessionCh: 496 log.Printf("[INFO] (dedup) releasing session '%s'", lopts.Key) 497 d.setLeader(t, nil) 498 _, err = client.Session().Destroy(session, nil) 499 if err != nil { 500 log.Printf("[ERROR] (dedup) failed destroying session '%s', %s", session, err) 501 } 502 return 503 case <-d.stopCh: 504 log.Printf("[INFO] (dedup) releasing lock '%s'", lopts.Key) 505 _, err = client.Session().Destroy(session, nil) 506 if err != nil { 507 log.Printf("[ERROR] (dedup) failed destroying session '%s', %s", session, err) 508 } 509 return 510 } 511 } 512} 513