1package watch
2
3import (
4	"context"
5	"fmt"
6	"io"
7	"sync"
8	"time"
9
10	consulapi "github.com/hashicorp/consul/api"
11	"github.com/mitchellh/mapstructure"
12)
13
14const DefaultTimeout = 10 * time.Second
15
16// Plan is the parsed version of a watch specification. A watch provides
17// the details of a query, which generates a view into the Consul data store.
18// This view is watched for changes and a handler is invoked to take any
19// appropriate actions.
20type Plan struct {
21	Datacenter  string
22	Token       string
23	Type        string
24	HandlerType string
25	Exempt      map[string]interface{}
26
27	Watcher   WatcherFunc
28	Handler   HandlerFunc
29	LogOutput io.Writer
30
31	address    string
32	client     *consulapi.Client
33	lastIndex  uint64
34	lastResult interface{}
35
36	stop       bool
37	stopCh     chan struct{}
38	stopLock   sync.Mutex
39	cancelFunc context.CancelFunc
40}
41
42type HttpHandlerConfig struct {
43	Path          string              `mapstructure:"path"`
44	Method        string              `mapstructure:"method"`
45	Timeout       time.Duration       `mapstructure:"-"`
46	TimeoutRaw    string              `mapstructure:"timeout"`
47	Header        map[string][]string `mapstructure:"header"`
48	TLSSkipVerify bool                `mapstructure:"tls_skip_verify"`
49}
50
51// WatcherFunc is used to watch for a diff
52type WatcherFunc func(*Plan) (uint64, interface{}, error)
53
54// HandlerFunc is used to handle new data
55type HandlerFunc func(uint64, interface{})
56
57// Parse takes a watch query and compiles it into a WatchPlan or an error
58func Parse(params map[string]interface{}) (*Plan, error) {
59	return ParseExempt(params, nil)
60}
61
62// ParseExempt takes a watch query and compiles it into a WatchPlan or an error
63// Any exempt parameters are stored in the Exempt map
64func ParseExempt(params map[string]interface{}, exempt []string) (*Plan, error) {
65	plan := &Plan{
66		stopCh: make(chan struct{}),
67		Exempt: make(map[string]interface{}),
68	}
69
70	// Parse the generic parameters
71	if err := assignValue(params, "datacenter", &plan.Datacenter); err != nil {
72		return nil, err
73	}
74	if err := assignValue(params, "token", &plan.Token); err != nil {
75		return nil, err
76	}
77	if err := assignValue(params, "type", &plan.Type); err != nil {
78		return nil, err
79	}
80	// Ensure there is a watch type
81	if plan.Type == "" {
82		return nil, fmt.Errorf("Watch type must be specified")
83	}
84
85	// Get the specific handler
86	if err := assignValue(params, "handler_type", &plan.HandlerType); err != nil {
87		return nil, err
88	}
89	switch plan.HandlerType {
90	case "http":
91		if _, ok := params["http_handler_config"]; !ok {
92			return nil, fmt.Errorf("Handler type 'http' requires 'http_handler_config' to be set")
93		}
94		config, err := parseHttpHandlerConfig(params["http_handler_config"])
95		if err != nil {
96			return nil, fmt.Errorf(fmt.Sprintf("Failed to parse 'http_handler_config': %v", err))
97		}
98		plan.Exempt["http_handler_config"] = config
99		delete(params, "http_handler_config")
100
101	case "script":
102		// Let the caller check for configuration in exempt parameters
103	}
104
105	// Look for a factory function
106	factory := watchFuncFactory[plan.Type]
107	if factory == nil {
108		return nil, fmt.Errorf("Unsupported watch type: %s", plan.Type)
109	}
110
111	// Get the watch func
112	fn, err := factory(params)
113	if err != nil {
114		return nil, err
115	}
116	plan.Watcher = fn
117
118	// Remove the exempt parameters
119	if len(exempt) > 0 {
120		for _, ex := range exempt {
121			val, ok := params[ex]
122			if ok {
123				plan.Exempt[ex] = val
124				delete(params, ex)
125			}
126		}
127	}
128
129	// Ensure all parameters are consumed
130	if len(params) != 0 {
131		var bad []string
132		for key := range params {
133			bad = append(bad, key)
134		}
135		return nil, fmt.Errorf("Invalid parameters: %v", bad)
136	}
137	return plan, nil
138}
139
140// assignValue is used to extract a value ensuring it is a string
141func assignValue(params map[string]interface{}, name string, out *string) error {
142	if raw, ok := params[name]; ok {
143		val, ok := raw.(string)
144		if !ok {
145			return fmt.Errorf("Expecting %s to be a string", name)
146		}
147		*out = val
148		delete(params, name)
149	}
150	return nil
151}
152
153// assignValueBool is used to extract a value ensuring it is a bool
154func assignValueBool(params map[string]interface{}, name string, out *bool) error {
155	if raw, ok := params[name]; ok {
156		val, ok := raw.(bool)
157		if !ok {
158			return fmt.Errorf("Expecting %s to be a boolean", name)
159		}
160		*out = val
161		delete(params, name)
162	}
163	return nil
164}
165
166// Parse the 'http_handler_config' parameters
167func parseHttpHandlerConfig(configParams interface{}) (*HttpHandlerConfig, error) {
168	var config HttpHandlerConfig
169	if err := mapstructure.Decode(configParams, &config); err != nil {
170		return nil, err
171	}
172
173	if config.Path == "" {
174		return nil, fmt.Errorf("Requires 'path' to be set")
175	}
176	if config.Method == "" {
177		config.Method = "POST"
178	}
179	if config.TimeoutRaw == "" {
180		config.Timeout = DefaultTimeout
181	} else if timeout, err := time.ParseDuration(config.TimeoutRaw); err != nil {
182		return nil, fmt.Errorf(fmt.Sprintf("Failed to parse timeout: %v", err))
183	} else {
184		config.Timeout = timeout
185	}
186
187	return &config, nil
188}
189