1package watch 2 3import ( 4 "context" 5 "fmt" 6 "io" 7 "sync" 8 "time" 9 10 consulapi "github.com/hashicorp/consul/api" 11 "github.com/mitchellh/mapstructure" 12) 13 14const DefaultTimeout = 10 * time.Second 15 16// Plan is the parsed version of a watch specification. A watch provides 17// the details of a query, which generates a view into the Consul data store. 18// This view is watched for changes and a handler is invoked to take any 19// appropriate actions. 20type Plan struct { 21 Datacenter string 22 Token string 23 Type string 24 HandlerType string 25 Exempt map[string]interface{} 26 27 Watcher WatcherFunc 28 Handler HandlerFunc 29 LogOutput io.Writer 30 31 address string 32 client *consulapi.Client 33 lastIndex uint64 34 lastResult interface{} 35 36 stop bool 37 stopCh chan struct{} 38 stopLock sync.Mutex 39 cancelFunc context.CancelFunc 40} 41 42type HttpHandlerConfig struct { 43 Path string `mapstructure:"path"` 44 Method string `mapstructure:"method"` 45 Timeout time.Duration `mapstructure:"-"` 46 TimeoutRaw string `mapstructure:"timeout"` 47 Header map[string][]string `mapstructure:"header"` 48 TLSSkipVerify bool `mapstructure:"tls_skip_verify"` 49} 50 51// WatcherFunc is used to watch for a diff 52type WatcherFunc func(*Plan) (uint64, interface{}, error) 53 54// HandlerFunc is used to handle new data 55type HandlerFunc func(uint64, interface{}) 56 57// Parse takes a watch query and compiles it into a WatchPlan or an error 58func Parse(params map[string]interface{}) (*Plan, error) { 59 return ParseExempt(params, nil) 60} 61 62// ParseExempt takes a watch query and compiles it into a WatchPlan or an error 63// Any exempt parameters are stored in the Exempt map 64func ParseExempt(params map[string]interface{}, exempt []string) (*Plan, error) { 65 plan := &Plan{ 66 stopCh: make(chan struct{}), 67 Exempt: make(map[string]interface{}), 68 } 69 70 // Parse the generic parameters 71 if err := assignValue(params, "datacenter", &plan.Datacenter); err != nil { 72 return nil, err 73 } 74 if err := assignValue(params, "token", &plan.Token); err != nil { 75 return nil, err 76 } 77 if err := assignValue(params, "type", &plan.Type); err != nil { 78 return nil, err 79 } 80 // Ensure there is a watch type 81 if plan.Type == "" { 82 return nil, fmt.Errorf("Watch type must be specified") 83 } 84 85 // Get the specific handler 86 if err := assignValue(params, "handler_type", &plan.HandlerType); err != nil { 87 return nil, err 88 } 89 switch plan.HandlerType { 90 case "http": 91 if _, ok := params["http_handler_config"]; !ok { 92 return nil, fmt.Errorf("Handler type 'http' requires 'http_handler_config' to be set") 93 } 94 config, err := parseHttpHandlerConfig(params["http_handler_config"]) 95 if err != nil { 96 return nil, fmt.Errorf(fmt.Sprintf("Failed to parse 'http_handler_config': %v", err)) 97 } 98 plan.Exempt["http_handler_config"] = config 99 delete(params, "http_handler_config") 100 101 case "script": 102 // Let the caller check for configuration in exempt parameters 103 } 104 105 // Look for a factory function 106 factory := watchFuncFactory[plan.Type] 107 if factory == nil { 108 return nil, fmt.Errorf("Unsupported watch type: %s", plan.Type) 109 } 110 111 // Get the watch func 112 fn, err := factory(params) 113 if err != nil { 114 return nil, err 115 } 116 plan.Watcher = fn 117 118 // Remove the exempt parameters 119 if len(exempt) > 0 { 120 for _, ex := range exempt { 121 val, ok := params[ex] 122 if ok { 123 plan.Exempt[ex] = val 124 delete(params, ex) 125 } 126 } 127 } 128 129 // Ensure all parameters are consumed 130 if len(params) != 0 { 131 var bad []string 132 for key := range params { 133 bad = append(bad, key) 134 } 135 return nil, fmt.Errorf("Invalid parameters: %v", bad) 136 } 137 return plan, nil 138} 139 140// assignValue is used to extract a value ensuring it is a string 141func assignValue(params map[string]interface{}, name string, out *string) error { 142 if raw, ok := params[name]; ok { 143 val, ok := raw.(string) 144 if !ok { 145 return fmt.Errorf("Expecting %s to be a string", name) 146 } 147 *out = val 148 delete(params, name) 149 } 150 return nil 151} 152 153// assignValueBool is used to extract a value ensuring it is a bool 154func assignValueBool(params map[string]interface{}, name string, out *bool) error { 155 if raw, ok := params[name]; ok { 156 val, ok := raw.(bool) 157 if !ok { 158 return fmt.Errorf("Expecting %s to be a boolean", name) 159 } 160 *out = val 161 delete(params, name) 162 } 163 return nil 164} 165 166// Parse the 'http_handler_config' parameters 167func parseHttpHandlerConfig(configParams interface{}) (*HttpHandlerConfig, error) { 168 var config HttpHandlerConfig 169 if err := mapstructure.Decode(configParams, &config); err != nil { 170 return nil, err 171 } 172 173 if config.Path == "" { 174 return nil, fmt.Errorf("Requires 'path' to be set") 175 } 176 if config.Method == "" { 177 config.Method = "POST" 178 } 179 if config.TimeoutRaw == "" { 180 config.Timeout = DefaultTimeout 181 } else if timeout, err := time.ParseDuration(config.TimeoutRaw); err != nil { 182 return nil, fmt.Errorf(fmt.Sprintf("Failed to parse timeout: %v", err)) 183 } else { 184 config.Timeout = timeout 185 } 186 187 return &config, nil 188} 189