1package watch 2 3import ( 4 "log" 5 "sync" 6 "time" 7 8 dep "github.com/hashicorp/consul-template/dependency" 9 "github.com/pkg/errors" 10) 11 12// dataBufferSize is the default number of views to process in a batch. 13const dataBufferSize = 2048 14 15type RetryFunc func(int) (bool, time.Duration) 16 17// Watcher is a top-level manager for views that poll Consul for data. 18type Watcher struct { 19 sync.Mutex 20 21 // clients is the collection of API clients to talk to upstreams. 22 clients *dep.ClientSet 23 24 // dataCh is the chan where Views will be published. 25 dataCh chan *View 26 27 // errCh is the chan where any errors will be published. 28 errCh chan error 29 30 // depViewMap is a map of Templates to Views. Templates are keyed by 31 // their string. 32 depViewMap map[string]*View 33 34 // maxStale specifies the maximum staleness of a query response. 35 maxStale time.Duration 36 37 // once signals if this watcher should tell views to retrieve data exactly 38 // one time instead of polling infinitely. 39 once bool 40 41 // retryFuncs specifies the different ways to retry based on the upstream. 42 retryFuncConsul RetryFunc 43 retryFuncDefault RetryFunc 44 retryFuncVault RetryFunc 45 46 // vaultGrace is the grace period between a lease and the max TTL for which 47 // Consul Template will generate a new secret instead of renewing an existing 48 // one. 49 vaultGrace time.Duration 50} 51 52type NewWatcherInput struct { 53 // Clients is the client set to communicate with upstreams. 54 Clients *dep.ClientSet 55 56 // MaxStale is the maximum staleness of a query. 57 MaxStale time.Duration 58 59 // Once specifies this watcher should tell views to poll exactly once. 60 Once bool 61 62 // RenewVault indicates if this watcher should renew Vault tokens. 63 RenewVault bool 64 65 // VaultToken is the vault token to renew. 66 VaultToken string 67 68 // VaultAgentTokenFile is the path to Vault Agent token file 69 VaultAgentTokenFile string 70 71 // RetryFuncs specify the different ways to retry based on the upstream. 72 RetryFuncConsul RetryFunc 73 RetryFuncDefault RetryFunc 74 RetryFuncVault RetryFunc 75 76 // VaultGrace is the grace period between a lease and the max TTL for which 77 // Consul Template will generate a new secret instead of renewing an existing 78 // one. 79 VaultGrace time.Duration 80} 81 82// NewWatcher creates a new watcher using the given API client. 83func NewWatcher(i *NewWatcherInput) (*Watcher, error) { 84 w := &Watcher{ 85 clients: i.Clients, 86 depViewMap: make(map[string]*View), 87 dataCh: make(chan *View, dataBufferSize), 88 errCh: make(chan error), 89 maxStale: i.MaxStale, 90 once: i.Once, 91 retryFuncConsul: i.RetryFuncConsul, 92 retryFuncDefault: i.RetryFuncDefault, 93 retryFuncVault: i.RetryFuncVault, 94 vaultGrace: i.VaultGrace, 95 } 96 97 // Start a watcher for the Vault renew if that config was specified 98 if i.RenewVault { 99 vt, err := dep.NewVaultTokenQuery(i.VaultToken) 100 if err != nil { 101 return nil, errors.Wrap(err, "watcher") 102 } 103 if _, err := w.Add(vt); err != nil { 104 return nil, errors.Wrap(err, "watcher") 105 } 106 } 107 108 if len(i.VaultAgentTokenFile) > 0 { 109 vag, err := dep.NewVaultAgentTokenQuery(i.VaultAgentTokenFile) 110 if err != nil { 111 return nil, errors.Wrap(err, "watcher") 112 } 113 if _, err := w.Add(vag); err != nil { 114 return nil, errors.Wrap(err, "watcher") 115 } 116 } 117 118 return w, nil 119} 120 121// DataCh returns a read-only channel of Views which is populated when a view 122// receives data from its upstream. 123func (w *Watcher) DataCh() <-chan *View { 124 return w.dataCh 125} 126 127// ErrCh returns a read-only channel of errors returned by the upstream. 128func (w *Watcher) ErrCh() <-chan error { 129 return w.errCh 130} 131 132// Add adds the given dependency to the list of monitored dependencies 133// and start the associated view. If the dependency already exists, no action is 134// taken. 135// 136// If the Dependency already existed, it this function will return false. If the 137// view was successfully created, it will return true. If an error occurs while 138// creating the view, it will be returned here (but future errors returned by 139// the view will happen on the channel). 140func (w *Watcher) Add(d dep.Dependency) (bool, error) { 141 w.Lock() 142 defer w.Unlock() 143 144 log.Printf("[DEBUG] (watcher) adding %s", d) 145 146 if _, ok := w.depViewMap[d.String()]; ok { 147 log.Printf("[TRACE] (watcher) %s already exists, skipping", d) 148 return false, nil 149 } 150 151 // Choose the correct retry function based off of the dependency's type. 152 var retryFunc RetryFunc 153 switch d.Type() { 154 case dep.TypeConsul: 155 retryFunc = w.retryFuncConsul 156 case dep.TypeVault: 157 retryFunc = w.retryFuncVault 158 default: 159 retryFunc = w.retryFuncDefault 160 } 161 162 v, err := NewView(&NewViewInput{ 163 Dependency: d, 164 Clients: w.clients, 165 MaxStale: w.maxStale, 166 Once: w.once, 167 RetryFunc: retryFunc, 168 VaultGrace: w.vaultGrace, 169 }) 170 if err != nil { 171 return false, errors.Wrap(err, "watcher") 172 } 173 174 log.Printf("[TRACE] (watcher) %s starting", d) 175 176 w.depViewMap[d.String()] = v 177 go v.poll(w.dataCh, w.errCh) 178 179 return true, nil 180} 181 182// Watching determines if the given dependency is being watched. 183func (w *Watcher) Watching(d dep.Dependency) bool { 184 w.Lock() 185 defer w.Unlock() 186 187 _, ok := w.depViewMap[d.String()] 188 return ok 189} 190 191// ForceWatching is used to force setting the internal state of watching 192// a dependency. This is only used for unit testing purposes. 193func (w *Watcher) ForceWatching(d dep.Dependency, enabled bool) { 194 w.Lock() 195 defer w.Unlock() 196 197 if enabled { 198 w.depViewMap[d.String()] = nil 199 } else { 200 delete(w.depViewMap, d.String()) 201 } 202} 203 204// Remove removes the given dependency from the list and stops the 205// associated View. If a View for the given dependency does not exist, this 206// function will return false. If the View does exist, this function will return 207// true upon successful deletion. 208func (w *Watcher) Remove(d dep.Dependency) bool { 209 w.Lock() 210 defer w.Unlock() 211 212 log.Printf("[DEBUG] (watcher) removing %s", d) 213 214 if view, ok := w.depViewMap[d.String()]; ok { 215 log.Printf("[TRACE] (watcher) actually removing %s", d) 216 view.stop() 217 delete(w.depViewMap, d.String()) 218 return true 219 } 220 221 log.Printf("[TRACE] (watcher) %s did not exist, skipping", d) 222 return false 223} 224 225// Size returns the number of views this watcher is watching. 226func (w *Watcher) Size() int { 227 w.Lock() 228 defer w.Unlock() 229 return len(w.depViewMap) 230} 231 232// Stop halts this watcher and any currently polling views immediately. If a 233// view was in the middle of a poll, no data will be returned. 234func (w *Watcher) Stop() { 235 w.Lock() 236 defer w.Unlock() 237 238 log.Printf("[DEBUG] (watcher) stopping all views") 239 240 for _, view := range w.depViewMap { 241 if view == nil { 242 continue 243 } 244 log.Printf("[TRACE] (watcher) stopping %s", view.Dependency()) 245 view.stop() 246 } 247 248 // Reset the map to have no views 249 w.depViewMap = make(map[string]*View) 250 251 // Close any idle TCP connections 252 w.clients.Stop() 253} 254