1package watch 2 3import ( 4 "context" 5 "fmt" 6 7 consulapi "github.com/hashicorp/consul/api" 8) 9 10// watchFactory is a function that can create a new WatchFunc 11// from a parameter configuration 12type watchFactory func(params map[string]interface{}) (WatcherFunc, error) 13 14// watchFuncFactory maps each type to a factory function 15var watchFuncFactory map[string]watchFactory 16 17func init() { 18 watchFuncFactory = map[string]watchFactory{ 19 "key": keyWatch, 20 "keyprefix": keyPrefixWatch, 21 "services": servicesWatch, 22 "nodes": nodesWatch, 23 "service": serviceWatch, 24 "checks": checksWatch, 25 "event": eventWatch, 26 "connect_roots": connectRootsWatch, 27 "connect_leaf": connectLeafWatch, 28 "agent_service": agentServiceWatch, 29 } 30} 31 32// keyWatch is used to return a key watching function 33func keyWatch(params map[string]interface{}) (WatcherFunc, error) { 34 stale := false 35 if err := assignValueBool(params, "stale", &stale); err != nil { 36 return nil, err 37 } 38 39 var key string 40 if err := assignValue(params, "key", &key); err != nil { 41 return nil, err 42 } 43 if key == "" { 44 return nil, fmt.Errorf("Must specify a single key to watch") 45 } 46 fn := func(p *Plan) (BlockingParamVal, interface{}, error) { 47 kv := p.client.KV() 48 opts := makeQueryOptionsWithContext(p, stale) 49 defer p.cancelFunc() 50 pair, meta, err := kv.Get(key, &opts) 51 if err != nil { 52 return nil, nil, err 53 } 54 if pair == nil { 55 return WaitIndexVal(meta.LastIndex), nil, err 56 } 57 return WaitIndexVal(meta.LastIndex), pair, err 58 } 59 return fn, nil 60} 61 62// keyPrefixWatch is used to return a key prefix watching function 63func keyPrefixWatch(params map[string]interface{}) (WatcherFunc, error) { 64 stale := false 65 if err := assignValueBool(params, "stale", &stale); err != nil { 66 return nil, err 67 } 68 69 var prefix string 70 if err := assignValue(params, "prefix", &prefix); err != nil { 71 return nil, err 72 } 73 if prefix == "" { 74 return nil, fmt.Errorf("Must specify a single prefix to watch") 75 } 76 fn := func(p *Plan) (BlockingParamVal, interface{}, error) { 77 kv := p.client.KV() 78 opts := makeQueryOptionsWithContext(p, stale) 79 defer p.cancelFunc() 80 pairs, meta, err := kv.List(prefix, &opts) 81 if err != nil { 82 return nil, nil, err 83 } 84 return WaitIndexVal(meta.LastIndex), pairs, err 85 } 86 return fn, nil 87} 88 89// servicesWatch is used to watch the list of available services 90func servicesWatch(params map[string]interface{}) (WatcherFunc, error) { 91 stale := false 92 if err := assignValueBool(params, "stale", &stale); err != nil { 93 return nil, err 94 } 95 96 fn := func(p *Plan) (BlockingParamVal, interface{}, error) { 97 catalog := p.client.Catalog() 98 opts := makeQueryOptionsWithContext(p, stale) 99 defer p.cancelFunc() 100 services, meta, err := catalog.Services(&opts) 101 if err != nil { 102 return nil, nil, err 103 } 104 return WaitIndexVal(meta.LastIndex), services, err 105 } 106 return fn, nil 107} 108 109// nodesWatch is used to watch the list of available nodes 110func nodesWatch(params map[string]interface{}) (WatcherFunc, error) { 111 stale := false 112 if err := assignValueBool(params, "stale", &stale); err != nil { 113 return nil, err 114 } 115 116 fn := func(p *Plan) (BlockingParamVal, interface{}, error) { 117 catalog := p.client.Catalog() 118 opts := makeQueryOptionsWithContext(p, stale) 119 defer p.cancelFunc() 120 nodes, meta, err := catalog.Nodes(&opts) 121 if err != nil { 122 return nil, nil, err 123 } 124 return WaitIndexVal(meta.LastIndex), nodes, err 125 } 126 return fn, nil 127} 128 129// serviceWatch is used to watch a specific service for changes 130func serviceWatch(params map[string]interface{}) (WatcherFunc, error) { 131 stale := false 132 if err := assignValueBool(params, "stale", &stale); err != nil { 133 return nil, err 134 } 135 136 var ( 137 service string 138 tags []string 139 ) 140 if err := assignValue(params, "service", &service); err != nil { 141 return nil, err 142 } 143 if service == "" { 144 return nil, fmt.Errorf("Must specify a single service to watch") 145 } 146 if err := assignValueStringSlice(params, "tag", &tags); err != nil { 147 return nil, err 148 } 149 150 passingOnly := false 151 if err := assignValueBool(params, "passingonly", &passingOnly); err != nil { 152 return nil, err 153 } 154 155 fn := func(p *Plan) (BlockingParamVal, interface{}, error) { 156 health := p.client.Health() 157 opts := makeQueryOptionsWithContext(p, stale) 158 defer p.cancelFunc() 159 nodes, meta, err := health.ServiceMultipleTags(service, tags, passingOnly, &opts) 160 if err != nil { 161 return nil, nil, err 162 } 163 return WaitIndexVal(meta.LastIndex), nodes, err 164 } 165 return fn, nil 166} 167 168// checksWatch is used to watch a specific checks in a given state 169func checksWatch(params map[string]interface{}) (WatcherFunc, error) { 170 stale := false 171 if err := assignValueBool(params, "stale", &stale); err != nil { 172 return nil, err 173 } 174 175 var service, state string 176 if err := assignValue(params, "service", &service); err != nil { 177 return nil, err 178 } 179 if err := assignValue(params, "state", &state); err != nil { 180 return nil, err 181 } 182 if service != "" && state != "" { 183 return nil, fmt.Errorf("Cannot specify service and state") 184 } 185 if service == "" && state == "" { 186 state = "any" 187 } 188 189 fn := func(p *Plan) (BlockingParamVal, interface{}, error) { 190 health := p.client.Health() 191 opts := makeQueryOptionsWithContext(p, stale) 192 defer p.cancelFunc() 193 var checks []*consulapi.HealthCheck 194 var meta *consulapi.QueryMeta 195 var err error 196 if state != "" { 197 checks, meta, err = health.State(state, &opts) 198 } else { 199 checks, meta, err = health.Checks(service, &opts) 200 } 201 if err != nil { 202 return nil, nil, err 203 } 204 return WaitIndexVal(meta.LastIndex), checks, err 205 } 206 return fn, nil 207} 208 209// eventWatch is used to watch for events, optionally filtering on name 210func eventWatch(params map[string]interface{}) (WatcherFunc, error) { 211 // The stale setting doesn't apply to events. 212 213 var name string 214 if err := assignValue(params, "name", &name); err != nil { 215 return nil, err 216 } 217 218 fn := func(p *Plan) (BlockingParamVal, interface{}, error) { 219 event := p.client.Event() 220 opts := makeQueryOptionsWithContext(p, false) 221 defer p.cancelFunc() 222 events, meta, err := event.List(name, &opts) 223 if err != nil { 224 return nil, nil, err 225 } 226 227 // Prune to only the new events 228 for i := 0; i < len(events); i++ { 229 if WaitIndexVal(event.IDToIndex(events[i].ID)).Equal(p.lastParamVal) { 230 events = events[i+1:] 231 break 232 } 233 } 234 return WaitIndexVal(meta.LastIndex), events, err 235 } 236 return fn, nil 237} 238 239// connectRootsWatch is used to watch for changes to Connect Root certificates. 240func connectRootsWatch(params map[string]interface{}) (WatcherFunc, error) { 241 // We don't support stale since roots are cached locally in the agent. 242 243 fn := func(p *Plan) (BlockingParamVal, interface{}, error) { 244 agent := p.client.Agent() 245 opts := makeQueryOptionsWithContext(p, false) 246 defer p.cancelFunc() 247 248 roots, meta, err := agent.ConnectCARoots(&opts) 249 if err != nil { 250 return nil, nil, err 251 } 252 253 return WaitIndexVal(meta.LastIndex), roots, err 254 } 255 return fn, nil 256} 257 258// connectLeafWatch is used to watch for changes to Connect Leaf certificates 259// for given local service id. 260func connectLeafWatch(params map[string]interface{}) (WatcherFunc, error) { 261 // We don't support stale since certs are cached locally in the agent. 262 263 var serviceName string 264 if err := assignValue(params, "service", &serviceName); err != nil { 265 return nil, err 266 } 267 268 fn := func(p *Plan) (BlockingParamVal, interface{}, error) { 269 agent := p.client.Agent() 270 opts := makeQueryOptionsWithContext(p, false) 271 defer p.cancelFunc() 272 273 leaf, meta, err := agent.ConnectCALeaf(serviceName, &opts) 274 if err != nil { 275 return nil, nil, err 276 } 277 278 return WaitIndexVal(meta.LastIndex), leaf, err 279 } 280 return fn, nil 281} 282 283// agentServiceWatch is used to watch for changes to a single service instance 284// on the local agent. Note that this state is agent-local so the watch 285// mechanism uses `hash` rather than `index` for deciding whether to block. 286func agentServiceWatch(params map[string]interface{}) (WatcherFunc, error) { 287 // We don't support consistency modes since it's agent local data 288 289 var serviceID string 290 if err := assignValue(params, "service_id", &serviceID); err != nil { 291 return nil, err 292 } 293 294 fn := func(p *Plan) (BlockingParamVal, interface{}, error) { 295 agent := p.client.Agent() 296 opts := makeQueryOptionsWithContext(p, false) 297 defer p.cancelFunc() 298 299 svc, _, err := agent.Service(serviceID, &opts) 300 if err != nil { 301 return nil, nil, err 302 } 303 304 // Return string ContentHash since we don't have Raft indexes to block on. 305 return WaitHashVal(svc.ContentHash), svc, err 306 } 307 return fn, nil 308} 309 310func makeQueryOptionsWithContext(p *Plan, stale bool) consulapi.QueryOptions { 311 ctx, cancel := context.WithCancel(context.Background()) 312 p.setCancelFunc(cancel) 313 opts := consulapi.QueryOptions{AllowStale: stale} 314 switch param := p.lastParamVal.(type) { 315 case WaitIndexVal: 316 opts.WaitIndex = uint64(param) 317 case WaitHashVal: 318 opts.WaitHash = string(param) 319 } 320 return *opts.WithContext(ctx) 321} 322