1// Copyright 2015 The Prometheus Authors 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package zookeeper 15 16import ( 17 "context" 18 "encoding/json" 19 "fmt" 20 "net" 21 "strconv" 22 "strings" 23 "time" 24 25 "github.com/go-kit/kit/log" 26 "github.com/prometheus/common/model" 27 "github.com/samuel/go-zookeeper/zk" 28 29 "github.com/prometheus/prometheus/discovery/targetgroup" 30 "github.com/prometheus/prometheus/util/strutil" 31 "github.com/prometheus/prometheus/util/treecache" 32) 33 34var ( 35 // DefaultServersetSDConfig is the default Serverset SD configuration. 36 DefaultServersetSDConfig = ServersetSDConfig{ 37 Timeout: model.Duration(10 * time.Second), 38 } 39 // DefaultNerveSDConfig is the default Nerve SD configuration. 40 DefaultNerveSDConfig = NerveSDConfig{ 41 Timeout: model.Duration(10 * time.Second), 42 } 43) 44 45// ServersetSDConfig is the configuration for Twitter serversets in Zookeeper based discovery. 46type ServersetSDConfig struct { 47 Servers []string `yaml:"servers"` 48 Paths []string `yaml:"paths"` 49 Timeout model.Duration `yaml:"timeout,omitempty"` 50} 51 52// UnmarshalYAML implements the yaml.Unmarshaler interface. 53func (c *ServersetSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { 54 *c = DefaultServersetSDConfig 55 type plain ServersetSDConfig 56 err := unmarshal((*plain)(c)) 57 if err != nil { 58 return err 59 } 60 if len(c.Servers) == 0 { 61 return fmt.Errorf("serverset SD config must contain at least one Zookeeper server") 62 } 63 if len(c.Paths) == 0 { 64 return fmt.Errorf("serverset SD config must contain at least one path") 65 } 66 for _, path := range c.Paths { 67 if !strings.HasPrefix(path, "/") { 68 return fmt.Errorf("serverset SD config paths must begin with '/': %s", path) 69 } 70 } 71 return nil 72} 73 74// NerveSDConfig is the configuration for AirBnB's Nerve in Zookeeper based discovery. 75type NerveSDConfig struct { 76 Servers []string `yaml:"servers"` 77 Paths []string `yaml:"paths"` 78 Timeout model.Duration `yaml:"timeout,omitempty"` 79} 80 81// UnmarshalYAML implements the yaml.Unmarshaler interface. 82func (c *NerveSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { 83 *c = DefaultNerveSDConfig 84 type plain NerveSDConfig 85 err := unmarshal((*plain)(c)) 86 if err != nil { 87 return err 88 } 89 if len(c.Servers) == 0 { 90 return fmt.Errorf("nerve SD config must contain at least one Zookeeper server") 91 } 92 if len(c.Paths) == 0 { 93 return fmt.Errorf("nerve SD config must contain at least one path") 94 } 95 for _, path := range c.Paths { 96 if !strings.HasPrefix(path, "/") { 97 return fmt.Errorf("nerve SD config paths must begin with '/': %s", path) 98 } 99 } 100 return nil 101} 102 103// Discovery implements the Discoverer interface for discovering 104// targets from Zookeeper. 105type Discovery struct { 106 conn *zk.Conn 107 108 sources map[string]*targetgroup.Group 109 110 updates chan treecache.ZookeeperTreeCacheEvent 111 treeCaches []*treecache.ZookeeperTreeCache 112 113 parse func(data []byte, path string) (model.LabelSet, error) 114 logger log.Logger 115} 116 117// NewNerveDiscovery returns a new Discovery for the given Nerve config. 118func NewNerveDiscovery(conf *NerveSDConfig, logger log.Logger) (*Discovery, error) { 119 return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, logger, parseNerveMember) 120} 121 122// NewServersetDiscovery returns a new Discovery for the given serverset config. 123func NewServersetDiscovery(conf *ServersetSDConfig, logger log.Logger) (*Discovery, error) { 124 return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, logger, parseServersetMember) 125} 126 127// NewDiscovery returns a new discovery along Zookeeper parses with 128// the given parse function. 129func NewDiscovery( 130 srvs []string, 131 timeout time.Duration, 132 paths []string, 133 logger log.Logger, 134 pf func(data []byte, path string) (model.LabelSet, error), 135) (*Discovery, error) { 136 if logger == nil { 137 logger = log.NewNopLogger() 138 } 139 140 conn, _, err := zk.Connect( 141 srvs, timeout, 142 func(c *zk.Conn) { 143 c.SetLogger(treecache.NewZookeeperLogger(logger)) 144 }) 145 if err != nil { 146 return nil, err 147 } 148 updates := make(chan treecache.ZookeeperTreeCacheEvent) 149 sd := &Discovery{ 150 conn: conn, 151 updates: updates, 152 sources: map[string]*targetgroup.Group{}, 153 parse: pf, 154 logger: logger, 155 } 156 for _, path := range paths { 157 sd.treeCaches = append(sd.treeCaches, treecache.NewZookeeperTreeCache(conn, path, updates, logger)) 158 } 159 return sd, nil 160} 161 162// Run implements the Discoverer interface. 163func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { 164 defer func() { 165 for _, tc := range d.treeCaches { 166 tc.Stop() 167 } 168 // Drain event channel in case the treecache leaks goroutines otherwise. 169 for range d.updates { 170 } 171 d.conn.Close() 172 }() 173 174 for { 175 select { 176 case <-ctx.Done(): 177 return 178 case event := <-d.updates: 179 tg := &targetgroup.Group{ 180 Source: event.Path, 181 } 182 if event.Data != nil { 183 labelSet, err := d.parse(*event.Data, event.Path) 184 if err == nil { 185 tg.Targets = []model.LabelSet{labelSet} 186 d.sources[event.Path] = tg 187 } else { 188 delete(d.sources, event.Path) 189 } 190 } else { 191 delete(d.sources, event.Path) 192 } 193 select { 194 case <-ctx.Done(): 195 return 196 case ch <- []*targetgroup.Group{tg}: 197 } 198 } 199 } 200} 201 202const ( 203 serversetLabelPrefix = model.MetaLabelPrefix + "serverset_" 204 serversetStatusLabel = serversetLabelPrefix + "status" 205 serversetPathLabel = serversetLabelPrefix + "path" 206 serversetEndpointLabelPrefix = serversetLabelPrefix + "endpoint" 207 serversetShardLabel = serversetLabelPrefix + "shard" 208) 209 210type serversetMember struct { 211 ServiceEndpoint serversetEndpoint 212 AdditionalEndpoints map[string]serversetEndpoint 213 Status string `json:"status"` 214 Shard int `json:"shard"` 215} 216 217type serversetEndpoint struct { 218 Host string 219 Port int 220} 221 222func parseServersetMember(data []byte, path string) (model.LabelSet, error) { 223 member := serversetMember{} 224 225 if err := json.Unmarshal(data, &member); err != nil { 226 return nil, fmt.Errorf("error unmarshaling serverset member %q: %s", path, err) 227 } 228 229 labels := model.LabelSet{} 230 labels[serversetPathLabel] = model.LabelValue(path) 231 labels[model.AddressLabel] = model.LabelValue( 232 net.JoinHostPort(member.ServiceEndpoint.Host, fmt.Sprintf("%d", member.ServiceEndpoint.Port))) 233 234 labels[serversetEndpointLabelPrefix+"_host"] = model.LabelValue(member.ServiceEndpoint.Host) 235 labels[serversetEndpointLabelPrefix+"_port"] = model.LabelValue(fmt.Sprintf("%d", member.ServiceEndpoint.Port)) 236 237 for name, endpoint := range member.AdditionalEndpoints { 238 cleanName := model.LabelName(strutil.SanitizeLabelName(name)) 239 labels[serversetEndpointLabelPrefix+"_host_"+cleanName] = model.LabelValue( 240 endpoint.Host) 241 labels[serversetEndpointLabelPrefix+"_port_"+cleanName] = model.LabelValue( 242 fmt.Sprintf("%d", endpoint.Port)) 243 244 } 245 246 labels[serversetStatusLabel] = model.LabelValue(member.Status) 247 labels[serversetShardLabel] = model.LabelValue(strconv.Itoa(member.Shard)) 248 249 return labels, nil 250} 251 252const ( 253 nerveLabelPrefix = model.MetaLabelPrefix + "nerve_" 254 nervePathLabel = nerveLabelPrefix + "path" 255 nerveEndpointLabelPrefix = nerveLabelPrefix + "endpoint" 256) 257 258type nerveMember struct { 259 Host string `json:"host"` 260 Port int `json:"port"` 261 Name string `json:"name"` 262} 263 264func parseNerveMember(data []byte, path string) (model.LabelSet, error) { 265 member := nerveMember{} 266 err := json.Unmarshal(data, &member) 267 if err != nil { 268 return nil, fmt.Errorf("error unmarshaling nerve member %q: %s", path, err) 269 } 270 271 labels := model.LabelSet{} 272 labels[nervePathLabel] = model.LabelValue(path) 273 labels[model.AddressLabel] = model.LabelValue( 274 net.JoinHostPort(member.Host, fmt.Sprintf("%d", member.Port))) 275 276 labels[nerveEndpointLabelPrefix+"_host"] = model.LabelValue(member.Host) 277 labels[nerveEndpointLabelPrefix+"_port"] = model.LabelValue(fmt.Sprintf("%d", member.Port)) 278 labels[nerveEndpointLabelPrefix+"_name"] = model.LabelValue(member.Name) 279 280 return labels, nil 281} 282