1package watch
2
3import (
4	"context"
5	"fmt"
6	"io"
7	"sync"
8	"time"
9
10	consulapi "github.com/hashicorp/consul/api"
11	"github.com/mitchellh/mapstructure"
12)
13
14const DefaultTimeout = 10 * time.Second
15
16// Plan is the parsed version of a watch specification. A watch provides
17// the details of a query, which generates a view into the Consul data store.
18// This view is watched for changes and a handler is invoked to take any
19// appropriate actions.
20type Plan struct {
21	Datacenter  string
22	Token       string
23	Type        string
24	HandlerType string
25	Exempt      map[string]interface{}
26
27	Watcher WatcherFunc
28	// Handler is kept for backward compatibility but only supports watches based
29	// on index param. To support hash based watches, set HybridHandler instead.
30	Handler       HandlerFunc
31	HybridHandler HybridHandlerFunc
32	LogOutput     io.Writer
33
34	address      string
35	client       *consulapi.Client
36	lastParamVal BlockingParamVal
37	lastResult   interface{}
38
39	stop       bool
40	stopCh     chan struct{}
41	stopLock   sync.Mutex
42	cancelFunc context.CancelFunc
43}
44
45type HttpHandlerConfig struct {
46	Path          string              `mapstructure:"path"`
47	Method        string              `mapstructure:"method"`
48	Timeout       time.Duration       `mapstructure:"-"`
49	TimeoutRaw    string              `mapstructure:"timeout"`
50	Header        map[string][]string `mapstructure:"header"`
51	TLSSkipVerify bool                `mapstructure:"tls_skip_verify"`
52}
53
54// BlockingParamVal is an interface representing the common operations needed for
55// different styles of blocking. It's used to abstract the core watch plan from
56// whether we are performing index-based or hash-based blocking.
57type BlockingParamVal interface {
58	// Equal returns whether the other param value should be considered equal
59	// (i.e. representing no change in the watched resource). Equal must not panic
60	// if other is nil.
61	Equal(other BlockingParamVal) bool
62
63	// Next is called when deciding which value to use on the next blocking call.
64	// It assumes the BlockingParamVal value it is called on is the most recent one
65	// returned and passes the previous one which may be nil as context. This
66	// allows types to customise logic around ordering without assuming there is
67	// an order. For example WaitIndexVal can check that the index didn't go
68	// backwards and if it did then reset to 0. Most other cases should just
69	// return themselves (the most recent value) to be used in the next request.
70	Next(previous BlockingParamVal) BlockingParamVal
71}
72
73// WaitIndexVal is a type representing a Consul index that implements
74// BlockingParamVal.
75type WaitIndexVal uint64
76
77// Equal implements BlockingParamVal
78func (idx WaitIndexVal) Equal(other BlockingParamVal) bool {
79	if otherIdx, ok := other.(WaitIndexVal); ok {
80		return idx == otherIdx
81	}
82	return false
83}
84
85// Next implements BlockingParamVal
86func (idx WaitIndexVal) Next(previous BlockingParamVal) BlockingParamVal {
87	if previous == nil {
88		return idx
89	}
90	prevIdx, ok := previous.(WaitIndexVal)
91	if ok && prevIdx > idx {
92		// This value is smaller than the previous index, reset.
93		return WaitIndexVal(0)
94	}
95	return idx
96}
97
98// WaitHashVal is a type representing a Consul content hash that implements
99// BlockingParamVal.
100type WaitHashVal string
101
102// Equal implements BlockingParamVal
103func (h WaitHashVal) Equal(other BlockingParamVal) bool {
104	if otherHash, ok := other.(WaitHashVal); ok {
105		return h == otherHash
106	}
107	return false
108}
109
110// Next implements BlockingParamVal
111func (h WaitHashVal) Next(previous BlockingParamVal) BlockingParamVal {
112	return h
113}
114
115// WatcherFunc is used to watch for a diff.
116type WatcherFunc func(*Plan) (BlockingParamVal, interface{}, error)
117
118// HandlerFunc is used to handle new data. It only works for index-based watches
119// (which is almost all end points currently) and is kept for backwards
120// compatibility until more places can make use of hash-based watches too.
121type HandlerFunc func(uint64, interface{})
122
123// HybridHandlerFunc is used to handle new data. It can support either
124// index-based or hash-based watches via the BlockingParamVal.
125type HybridHandlerFunc func(BlockingParamVal, interface{})
126
127// Parse takes a watch query and compiles it into a WatchPlan or an error
128func Parse(params map[string]interface{}) (*Plan, error) {
129	return ParseExempt(params, nil)
130}
131
132// ParseExempt takes a watch query and compiles it into a WatchPlan or an error
133// Any exempt parameters are stored in the Exempt map
134func ParseExempt(params map[string]interface{}, exempt []string) (*Plan, error) {
135	plan := &Plan{
136		stopCh: make(chan struct{}),
137		Exempt: make(map[string]interface{}),
138	}
139
140	// Parse the generic parameters
141	if err := assignValue(params, "datacenter", &plan.Datacenter); err != nil {
142		return nil, err
143	}
144	if err := assignValue(params, "token", &plan.Token); err != nil {
145		return nil, err
146	}
147	if err := assignValue(params, "type", &plan.Type); err != nil {
148		return nil, err
149	}
150	// Ensure there is a watch type
151	if plan.Type == "" {
152		return nil, fmt.Errorf("Watch type must be specified")
153	}
154
155	// Get the specific handler
156	if err := assignValue(params, "handler_type", &plan.HandlerType); err != nil {
157		return nil, err
158	}
159	switch plan.HandlerType {
160	case "http":
161		if _, ok := params["http_handler_config"]; !ok {
162			return nil, fmt.Errorf("Handler type 'http' requires 'http_handler_config' to be set")
163		}
164		config, err := parseHttpHandlerConfig(params["http_handler_config"])
165		if err != nil {
166			return nil, fmt.Errorf(fmt.Sprintf("Failed to parse 'http_handler_config': %v", err))
167		}
168		plan.Exempt["http_handler_config"] = config
169		delete(params, "http_handler_config")
170
171	case "script":
172		// Let the caller check for configuration in exempt parameters
173	}
174
175	// Look for a factory function
176	factory := watchFuncFactory[plan.Type]
177	if factory == nil {
178		return nil, fmt.Errorf("Unsupported watch type: %s", plan.Type)
179	}
180
181	// Get the watch func
182	fn, err := factory(params)
183	if err != nil {
184		return nil, err
185	}
186	plan.Watcher = fn
187
188	// Remove the exempt parameters
189	if len(exempt) > 0 {
190		for _, ex := range exempt {
191			val, ok := params[ex]
192			if ok {
193				plan.Exempt[ex] = val
194				delete(params, ex)
195			}
196		}
197	}
198
199	// Ensure all parameters are consumed
200	if len(params) != 0 {
201		var bad []string
202		for key := range params {
203			bad = append(bad, key)
204		}
205		return nil, fmt.Errorf("Invalid parameters: %v", bad)
206	}
207	return plan, nil
208}
209
210// assignValue is used to extract a value ensuring it is a string
211func assignValue(params map[string]interface{}, name string, out *string) error {
212	if raw, ok := params[name]; ok {
213		val, ok := raw.(string)
214		if !ok {
215			return fmt.Errorf("Expecting %s to be a string", name)
216		}
217		*out = val
218		delete(params, name)
219	}
220	return nil
221}
222
223// assignValueBool is used to extract a value ensuring it is a bool
224func assignValueBool(params map[string]interface{}, name string, out *bool) error {
225	if raw, ok := params[name]; ok {
226		val, ok := raw.(bool)
227		if !ok {
228			return fmt.Errorf("Expecting %s to be a boolean", name)
229		}
230		*out = val
231		delete(params, name)
232	}
233	return nil
234}
235
236// Parse the 'http_handler_config' parameters
237func parseHttpHandlerConfig(configParams interface{}) (*HttpHandlerConfig, error) {
238	var config HttpHandlerConfig
239	if err := mapstructure.Decode(configParams, &config); err != nil {
240		return nil, err
241	}
242
243	if config.Path == "" {
244		return nil, fmt.Errorf("Requires 'path' to be set")
245	}
246	if config.Method == "" {
247		config.Method = "POST"
248	}
249	if config.TimeoutRaw == "" {
250		config.Timeout = DefaultTimeout
251	} else if timeout, err := time.ParseDuration(config.TimeoutRaw); err != nil {
252		return nil, fmt.Errorf(fmt.Sprintf("Failed to parse timeout: %v", err))
253	} else {
254		config.Timeout = timeout
255	}
256
257	return &config, nil
258}
259