1package watch
2
3import (
4	"context"
5	"fmt"
6
7	consulapi "github.com/hashicorp/consul/api"
8)
9
10// watchFactory is a function that can create a new WatchFunc
11// from a parameter configuration
12type watchFactory func(params map[string]interface{}) (WatcherFunc, error)
13
14// watchFuncFactory maps each type to a factory function
15var watchFuncFactory map[string]watchFactory
16
17func init() {
18	watchFuncFactory = map[string]watchFactory{
19		"key":           keyWatch,
20		"keyprefix":     keyPrefixWatch,
21		"services":      servicesWatch,
22		"nodes":         nodesWatch,
23		"service":       serviceWatch,
24		"checks":        checksWatch,
25		"event":         eventWatch,
26		"connect_roots": connectRootsWatch,
27		"connect_leaf":  connectLeafWatch,
28		"agent_service": agentServiceWatch,
29	}
30}
31
32// keyWatch is used to return a key watching function
33func keyWatch(params map[string]interface{}) (WatcherFunc, error) {
34	stale := false
35	if err := assignValueBool(params, "stale", &stale); err != nil {
36		return nil, err
37	}
38
39	var key string
40	if err := assignValue(params, "key", &key); err != nil {
41		return nil, err
42	}
43	if key == "" {
44		return nil, fmt.Errorf("Must specify a single key to watch")
45	}
46	fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
47		kv := p.client.KV()
48		opts := makeQueryOptionsWithContext(p, stale)
49		defer p.cancelFunc()
50		pair, meta, err := kv.Get(key, &opts)
51		if err != nil {
52			return nil, nil, err
53		}
54		if pair == nil {
55			return WaitIndexVal(meta.LastIndex), nil, err
56		}
57		return WaitIndexVal(meta.LastIndex), pair, err
58	}
59	return fn, nil
60}
61
62// keyPrefixWatch is used to return a key prefix watching function
63func keyPrefixWatch(params map[string]interface{}) (WatcherFunc, error) {
64	stale := false
65	if err := assignValueBool(params, "stale", &stale); err != nil {
66		return nil, err
67	}
68
69	var prefix string
70	if err := assignValue(params, "prefix", &prefix); err != nil {
71		return nil, err
72	}
73	if prefix == "" {
74		return nil, fmt.Errorf("Must specify a single prefix to watch")
75	}
76	fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
77		kv := p.client.KV()
78		opts := makeQueryOptionsWithContext(p, stale)
79		defer p.cancelFunc()
80		pairs, meta, err := kv.List(prefix, &opts)
81		if err != nil {
82			return nil, nil, err
83		}
84		return WaitIndexVal(meta.LastIndex), pairs, err
85	}
86	return fn, nil
87}
88
89// servicesWatch is used to watch the list of available services
90func servicesWatch(params map[string]interface{}) (WatcherFunc, error) {
91	stale := false
92	if err := assignValueBool(params, "stale", &stale); err != nil {
93		return nil, err
94	}
95
96	fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
97		catalog := p.client.Catalog()
98		opts := makeQueryOptionsWithContext(p, stale)
99		defer p.cancelFunc()
100		services, meta, err := catalog.Services(&opts)
101		if err != nil {
102			return nil, nil, err
103		}
104		return WaitIndexVal(meta.LastIndex), services, err
105	}
106	return fn, nil
107}
108
109// nodesWatch is used to watch the list of available nodes
110func nodesWatch(params map[string]interface{}) (WatcherFunc, error) {
111	stale := false
112	if err := assignValueBool(params, "stale", &stale); err != nil {
113		return nil, err
114	}
115
116	fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
117		catalog := p.client.Catalog()
118		opts := makeQueryOptionsWithContext(p, stale)
119		defer p.cancelFunc()
120		nodes, meta, err := catalog.Nodes(&opts)
121		if err != nil {
122			return nil, nil, err
123		}
124		return WaitIndexVal(meta.LastIndex), nodes, err
125	}
126	return fn, nil
127}
128
129// serviceWatch is used to watch a specific service for changes
130func serviceWatch(params map[string]interface{}) (WatcherFunc, error) {
131	stale := false
132	if err := assignValueBool(params, "stale", &stale); err != nil {
133		return nil, err
134	}
135
136	var (
137		service string
138		tags    []string
139	)
140	if err := assignValue(params, "service", &service); err != nil {
141		return nil, err
142	}
143	if service == "" {
144		return nil, fmt.Errorf("Must specify a single service to watch")
145	}
146	if err := assignValueStringSlice(params, "tag", &tags); err != nil {
147		return nil, err
148	}
149
150	passingOnly := false
151	if err := assignValueBool(params, "passingonly", &passingOnly); err != nil {
152		return nil, err
153	}
154
155	fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
156		health := p.client.Health()
157		opts := makeQueryOptionsWithContext(p, stale)
158		defer p.cancelFunc()
159		nodes, meta, err := health.ServiceMultipleTags(service, tags, passingOnly, &opts)
160		if err != nil {
161			return nil, nil, err
162		}
163		return WaitIndexVal(meta.LastIndex), nodes, err
164	}
165	return fn, nil
166}
167
168// checksWatch is used to watch a specific checks in a given state
169func checksWatch(params map[string]interface{}) (WatcherFunc, error) {
170	stale := false
171	if err := assignValueBool(params, "stale", &stale); err != nil {
172		return nil, err
173	}
174
175	var service, state string
176	if err := assignValue(params, "service", &service); err != nil {
177		return nil, err
178	}
179	if err := assignValue(params, "state", &state); err != nil {
180		return nil, err
181	}
182	if service != "" && state != "" {
183		return nil, fmt.Errorf("Cannot specify service and state")
184	}
185	if service == "" && state == "" {
186		state = "any"
187	}
188
189	fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
190		health := p.client.Health()
191		opts := makeQueryOptionsWithContext(p, stale)
192		defer p.cancelFunc()
193		var checks []*consulapi.HealthCheck
194		var meta *consulapi.QueryMeta
195		var err error
196		if state != "" {
197			checks, meta, err = health.State(state, &opts)
198		} else {
199			checks, meta, err = health.Checks(service, &opts)
200		}
201		if err != nil {
202			return nil, nil, err
203		}
204		return WaitIndexVal(meta.LastIndex), checks, err
205	}
206	return fn, nil
207}
208
209// eventWatch is used to watch for events, optionally filtering on name
210func eventWatch(params map[string]interface{}) (WatcherFunc, error) {
211	// The stale setting doesn't apply to events.
212
213	var name string
214	if err := assignValue(params, "name", &name); err != nil {
215		return nil, err
216	}
217
218	fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
219		event := p.client.Event()
220		opts := makeQueryOptionsWithContext(p, false)
221		defer p.cancelFunc()
222		events, meta, err := event.List(name, &opts)
223		if err != nil {
224			return nil, nil, err
225		}
226
227		// Prune to only the new events
228		for i := 0; i < len(events); i++ {
229			if WaitIndexVal(event.IDToIndex(events[i].ID)).Equal(p.lastParamVal) {
230				events = events[i+1:]
231				break
232			}
233		}
234		return WaitIndexVal(meta.LastIndex), events, err
235	}
236	return fn, nil
237}
238
239// connectRootsWatch is used to watch for changes to Connect Root certificates.
240func connectRootsWatch(params map[string]interface{}) (WatcherFunc, error) {
241	// We don't support stale since roots are cached locally in the agent.
242
243	fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
244		agent := p.client.Agent()
245		opts := makeQueryOptionsWithContext(p, false)
246		defer p.cancelFunc()
247
248		roots, meta, err := agent.ConnectCARoots(&opts)
249		if err != nil {
250			return nil, nil, err
251		}
252
253		return WaitIndexVal(meta.LastIndex), roots, err
254	}
255	return fn, nil
256}
257
258// connectLeafWatch is used to watch for changes to Connect Leaf certificates
259// for given local service id.
260func connectLeafWatch(params map[string]interface{}) (WatcherFunc, error) {
261	// We don't support stale since certs are cached locally in the agent.
262
263	var serviceName string
264	if err := assignValue(params, "service", &serviceName); err != nil {
265		return nil, err
266	}
267
268	fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
269		agent := p.client.Agent()
270		opts := makeQueryOptionsWithContext(p, false)
271		defer p.cancelFunc()
272
273		leaf, meta, err := agent.ConnectCALeaf(serviceName, &opts)
274		if err != nil {
275			return nil, nil, err
276		}
277
278		return WaitIndexVal(meta.LastIndex), leaf, err
279	}
280	return fn, nil
281}
282
283// agentServiceWatch is used to watch for changes to a single service instance
284// on the local agent. Note that this state is agent-local so the watch
285// mechanism uses `hash` rather than `index` for deciding whether to block.
286func agentServiceWatch(params map[string]interface{}) (WatcherFunc, error) {
287	// We don't support consistency modes since it's agent local data
288
289	var serviceID string
290	if err := assignValue(params, "service_id", &serviceID); err != nil {
291		return nil, err
292	}
293
294	fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
295		agent := p.client.Agent()
296		opts := makeQueryOptionsWithContext(p, false)
297		defer p.cancelFunc()
298
299		svc, _, err := agent.Service(serviceID, &opts)
300		if err != nil {
301			return nil, nil, err
302		}
303
304		// Return string ContentHash since we don't have Raft indexes to block on.
305		return WaitHashVal(svc.ContentHash), svc, err
306	}
307	return fn, nil
308}
309
310func makeQueryOptionsWithContext(p *Plan, stale bool) consulapi.QueryOptions {
311	ctx, cancel := context.WithCancel(context.Background())
312	p.setCancelFunc(cancel)
313	opts := consulapi.QueryOptions{AllowStale: stale}
314	switch param := p.lastParamVal.(type) {
315	case WaitIndexVal:
316		opts.WaitIndex = uint64(param)
317	case WaitHashVal:
318		opts.WaitHash = string(param)
319	}
320	return *opts.WithContext(ctx)
321}
322