1package watch 2 3import ( 4 "context" 5 "fmt" 6 "io" 7 "sync" 8 "time" 9 10 consulapi "github.com/hashicorp/consul/api" 11 "github.com/mitchellh/mapstructure" 12) 13 14const DefaultTimeout = 10 * time.Second 15 16// Plan is the parsed version of a watch specification. A watch provides 17// the details of a query, which generates a view into the Consul data store. 18// This view is watched for changes and a handler is invoked to take any 19// appropriate actions. 20type Plan struct { 21 Datacenter string 22 Token string 23 Type string 24 HandlerType string 25 Exempt map[string]interface{} 26 27 Watcher WatcherFunc 28 // Handler is kept for backward compatibility but only supports watches based 29 // on index param. To support hash based watches, set HybridHandler instead. 30 Handler HandlerFunc 31 HybridHandler HybridHandlerFunc 32 LogOutput io.Writer 33 34 address string 35 client *consulapi.Client 36 lastParamVal BlockingParamVal 37 lastResult interface{} 38 39 stop bool 40 stopCh chan struct{} 41 stopLock sync.Mutex 42 cancelFunc context.CancelFunc 43} 44 45type HttpHandlerConfig struct { 46 Path string `mapstructure:"path"` 47 Method string `mapstructure:"method"` 48 Timeout time.Duration `mapstructure:"-"` 49 TimeoutRaw string `mapstructure:"timeout"` 50 Header map[string][]string `mapstructure:"header"` 51 TLSSkipVerify bool `mapstructure:"tls_skip_verify"` 52} 53 54// BlockingParamVal is an interface representing the common operations needed for 55// different styles of blocking. It's used to abstract the core watch plan from 56// whether we are performing index-based or hash-based blocking. 57type BlockingParamVal interface { 58 // Equal returns whether the other param value should be considered equal 59 // (i.e. representing no change in the watched resource). Equal must not panic 60 // if other is nil. 61 Equal(other BlockingParamVal) bool 62 63 // Next is called when deciding which value to use on the next blocking call. 64 // It assumes the BlockingParamVal value it is called on is the most recent one 65 // returned and passes the previous one which may be nil as context. This 66 // allows types to customize logic around ordering without assuming there is 67 // an order. For example WaitIndexVal can check that the index didn't go 68 // backwards and if it did then reset to 0. Most other cases should just 69 // return themselves (the most recent value) to be used in the next request. 70 Next(previous BlockingParamVal) BlockingParamVal 71} 72 73// WaitIndexVal is a type representing a Consul index that implements 74// BlockingParamVal. 75type WaitIndexVal uint64 76 77// Equal implements BlockingParamVal 78func (idx WaitIndexVal) Equal(other BlockingParamVal) bool { 79 if otherIdx, ok := other.(WaitIndexVal); ok { 80 return idx == otherIdx 81 } 82 return false 83} 84 85// Next implements BlockingParamVal 86func (idx WaitIndexVal) Next(previous BlockingParamVal) BlockingParamVal { 87 if previous == nil { 88 return idx 89 } 90 prevIdx, ok := previous.(WaitIndexVal) 91 if ok && prevIdx == idx { 92 // This value is the same as the previous index, reset 93 return WaitIndexVal(0) 94 } 95 return idx 96} 97 98// WaitHashVal is a type representing a Consul content hash that implements 99// BlockingParamVal. 100type WaitHashVal string 101 102// Equal implements BlockingParamVal 103func (h WaitHashVal) Equal(other BlockingParamVal) bool { 104 if otherHash, ok := other.(WaitHashVal); ok { 105 return h == otherHash 106 } 107 return false 108} 109 110// Next implements BlockingParamVal 111func (h WaitHashVal) Next(previous BlockingParamVal) BlockingParamVal { 112 return h 113} 114 115// WatcherFunc is used to watch for a diff. 116type WatcherFunc func(*Plan) (BlockingParamVal, interface{}, error) 117 118// HandlerFunc is used to handle new data. It only works for index-based watches 119// (which is almost all end points currently) and is kept for backwards 120// compatibility until more places can make use of hash-based watches too. 121type HandlerFunc func(uint64, interface{}) 122 123// HybridHandlerFunc is used to handle new data. It can support either 124// index-based or hash-based watches via the BlockingParamVal. 125type HybridHandlerFunc func(BlockingParamVal, interface{}) 126 127// Parse takes a watch query and compiles it into a WatchPlan or an error 128func Parse(params map[string]interface{}) (*Plan, error) { 129 return ParseExempt(params, nil) 130} 131 132// ParseExempt takes a watch query and compiles it into a WatchPlan or an error 133// Any exempt parameters are stored in the Exempt map 134func ParseExempt(params map[string]interface{}, exempt []string) (*Plan, error) { 135 plan := &Plan{ 136 stopCh: make(chan struct{}), 137 Exempt: make(map[string]interface{}), 138 } 139 140 // Parse the generic parameters 141 if err := assignValue(params, "datacenter", &plan.Datacenter); err != nil { 142 return nil, err 143 } 144 if err := assignValue(params, "token", &plan.Token); err != nil { 145 return nil, err 146 } 147 if err := assignValue(params, "type", &plan.Type); err != nil { 148 return nil, err 149 } 150 // Ensure there is a watch type 151 if plan.Type == "" { 152 return nil, fmt.Errorf("Watch type must be specified") 153 } 154 155 // Get the specific handler 156 if err := assignValue(params, "handler_type", &plan.HandlerType); err != nil { 157 return nil, err 158 } 159 switch plan.HandlerType { 160 case "http": 161 if _, ok := params["http_handler_config"]; !ok { 162 return nil, fmt.Errorf("Handler type 'http' requires 'http_handler_config' to be set") 163 } 164 config, err := parseHttpHandlerConfig(params["http_handler_config"]) 165 if err != nil { 166 return nil, fmt.Errorf(fmt.Sprintf("Failed to parse 'http_handler_config': %v", err)) 167 } 168 plan.Exempt["http_handler_config"] = config 169 delete(params, "http_handler_config") 170 171 case "script": 172 // Let the caller check for configuration in exempt parameters 173 } 174 175 // Look for a factory function 176 factory := watchFuncFactory[plan.Type] 177 if factory == nil { 178 return nil, fmt.Errorf("Unsupported watch type: %s", plan.Type) 179 } 180 181 // Get the watch func 182 fn, err := factory(params) 183 if err != nil { 184 return nil, err 185 } 186 plan.Watcher = fn 187 188 // Remove the exempt parameters 189 if len(exempt) > 0 { 190 for _, ex := range exempt { 191 val, ok := params[ex] 192 if ok { 193 plan.Exempt[ex] = val 194 delete(params, ex) 195 } 196 } 197 } 198 199 // Ensure all parameters are consumed 200 if len(params) != 0 { 201 var bad []string 202 for key := range params { 203 bad = append(bad, key) 204 } 205 return nil, fmt.Errorf("Invalid parameters: %v", bad) 206 } 207 return plan, nil 208} 209 210// assignValue is used to extract a value ensuring it is a string 211func assignValue(params map[string]interface{}, name string, out *string) error { 212 if raw, ok := params[name]; ok { 213 val, ok := raw.(string) 214 if !ok { 215 return fmt.Errorf("Expecting %s to be a string", name) 216 } 217 *out = val 218 delete(params, name) 219 } 220 return nil 221} 222 223// assignValueBool is used to extract a value ensuring it is a bool 224func assignValueBool(params map[string]interface{}, name string, out *bool) error { 225 if raw, ok := params[name]; ok { 226 val, ok := raw.(bool) 227 if !ok { 228 return fmt.Errorf("Expecting %s to be a boolean", name) 229 } 230 *out = val 231 delete(params, name) 232 } 233 return nil 234} 235 236// assignValueStringSlice is used to extract a value ensuring it is either a string or a slice of strings 237func assignValueStringSlice(params map[string]interface{}, name string, out *[]string) error { 238 if raw, ok := params[name]; ok { 239 var tmp []string 240 switch raw.(type) { 241 case string: 242 tmp = make([]string, 1, 1) 243 tmp[0] = raw.(string) 244 case []string: 245 l := len(raw.([]string)) 246 tmp = make([]string, l, l) 247 copy(tmp, raw.([]string)) 248 case []interface{}: 249 l := len(raw.([]interface{})) 250 tmp = make([]string, l, l) 251 for i, v := range raw.([]interface{}) { 252 if s, ok := v.(string); ok { 253 tmp[i] = s 254 } else { 255 return fmt.Errorf("Index %d of %s expected to be string", i, name) 256 } 257 } 258 default: 259 return fmt.Errorf("Expecting %s to be a string or []string", name) 260 } 261 *out = tmp 262 delete(params, name) 263 } 264 return nil 265} 266 267// Parse the 'http_handler_config' parameters 268func parseHttpHandlerConfig(configParams interface{}) (*HttpHandlerConfig, error) { 269 var config HttpHandlerConfig 270 if err := mapstructure.Decode(configParams, &config); err != nil { 271 return nil, err 272 } 273 274 if config.Path == "" { 275 return nil, fmt.Errorf("Requires 'path' to be set") 276 } 277 if config.Method == "" { 278 config.Method = "POST" 279 } 280 if config.TimeoutRaw == "" { 281 config.Timeout = DefaultTimeout 282 } else if timeout, err := time.ParseDuration(config.TimeoutRaw); err != nil { 283 return nil, fmt.Errorf(fmt.Sprintf("Failed to parse timeout: %v", err)) 284 } else { 285 config.Timeout = timeout 286 } 287 288 return &config, nil 289} 290