1// Copyright 2015 The Prometheus Authors 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package zookeeper 15 16import ( 17 "context" 18 "encoding/json" 19 "fmt" 20 "net" 21 "strconv" 22 "strings" 23 "time" 24 25 "github.com/go-kit/kit/log" 26 "github.com/pkg/errors" 27 "github.com/prometheus/common/model" 28 "github.com/samuel/go-zookeeper/zk" 29 30 "github.com/prometheus/prometheus/discovery/targetgroup" 31 "github.com/prometheus/prometheus/util/strutil" 32 "github.com/prometheus/prometheus/util/treecache" 33) 34 35var ( 36 // DefaultServersetSDConfig is the default Serverset SD configuration. 37 DefaultServersetSDConfig = ServersetSDConfig{ 38 Timeout: model.Duration(10 * time.Second), 39 } 40 // DefaultNerveSDConfig is the default Nerve SD configuration. 41 DefaultNerveSDConfig = NerveSDConfig{ 42 Timeout: model.Duration(10 * time.Second), 43 } 44) 45 46// ServersetSDConfig is the configuration for Twitter serversets in Zookeeper based discovery. 47type ServersetSDConfig struct { 48 Servers []string `yaml:"servers"` 49 Paths []string `yaml:"paths"` 50 Timeout model.Duration `yaml:"timeout,omitempty"` 51} 52 53// UnmarshalYAML implements the yaml.Unmarshaler interface. 54func (c *ServersetSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { 55 *c = DefaultServersetSDConfig 56 type plain ServersetSDConfig 57 err := unmarshal((*plain)(c)) 58 if err != nil { 59 return err 60 } 61 if len(c.Servers) == 0 { 62 return errors.New("serverset SD config must contain at least one Zookeeper server") 63 } 64 if len(c.Paths) == 0 { 65 return errors.New("serverset SD config must contain at least one path") 66 } 67 for _, path := range c.Paths { 68 if !strings.HasPrefix(path, "/") { 69 return errors.Errorf("serverset SD config paths must begin with '/': %s", path) 70 } 71 } 72 return nil 73} 74 75// NerveSDConfig is the configuration for AirBnB's Nerve in Zookeeper based discovery. 76type NerveSDConfig struct { 77 Servers []string `yaml:"servers"` 78 Paths []string `yaml:"paths"` 79 Timeout model.Duration `yaml:"timeout,omitempty"` 80} 81 82// UnmarshalYAML implements the yaml.Unmarshaler interface. 83func (c *NerveSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { 84 *c = DefaultNerveSDConfig 85 type plain NerveSDConfig 86 err := unmarshal((*plain)(c)) 87 if err != nil { 88 return err 89 } 90 if len(c.Servers) == 0 { 91 return errors.New("nerve SD config must contain at least one Zookeeper server") 92 } 93 if len(c.Paths) == 0 { 94 return errors.New("nerve SD config must contain at least one path") 95 } 96 for _, path := range c.Paths { 97 if !strings.HasPrefix(path, "/") { 98 return errors.Errorf("nerve SD config paths must begin with '/': %s", path) 99 } 100 } 101 return nil 102} 103 104// Discovery implements the Discoverer interface for discovering 105// targets from Zookeeper. 106type Discovery struct { 107 conn *zk.Conn 108 109 sources map[string]*targetgroup.Group 110 111 updates chan treecache.ZookeeperTreeCacheEvent 112 pathUpdates []chan treecache.ZookeeperTreeCacheEvent 113 treeCaches []*treecache.ZookeeperTreeCache 114 115 parse func(data []byte, path string) (model.LabelSet, error) 116 logger log.Logger 117} 118 119// NewNerveDiscovery returns a new Discovery for the given Nerve config. 120func NewNerveDiscovery(conf *NerveSDConfig, logger log.Logger) (*Discovery, error) { 121 return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, logger, parseNerveMember) 122} 123 124// NewServersetDiscovery returns a new Discovery for the given serverset config. 125func NewServersetDiscovery(conf *ServersetSDConfig, logger log.Logger) (*Discovery, error) { 126 return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, logger, parseServersetMember) 127} 128 129// NewDiscovery returns a new discovery along Zookeeper parses with 130// the given parse function. 131func NewDiscovery( 132 srvs []string, 133 timeout time.Duration, 134 paths []string, 135 logger log.Logger, 136 pf func(data []byte, path string) (model.LabelSet, error), 137) (*Discovery, error) { 138 if logger == nil { 139 logger = log.NewNopLogger() 140 } 141 142 conn, _, err := zk.Connect( 143 srvs, timeout, 144 func(c *zk.Conn) { 145 c.SetLogger(treecache.NewZookeeperLogger(logger)) 146 }) 147 if err != nil { 148 return nil, err 149 } 150 updates := make(chan treecache.ZookeeperTreeCacheEvent) 151 sd := &Discovery{ 152 conn: conn, 153 updates: updates, 154 sources: map[string]*targetgroup.Group{}, 155 parse: pf, 156 logger: logger, 157 } 158 for _, path := range paths { 159 pathUpdate := make(chan treecache.ZookeeperTreeCacheEvent) 160 sd.pathUpdates = append(sd.pathUpdates, pathUpdate) 161 sd.treeCaches = append(sd.treeCaches, treecache.NewZookeeperTreeCache(conn, path, pathUpdate, logger)) 162 } 163 return sd, nil 164} 165 166// Run implements the Discoverer interface. 167func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { 168 defer func() { 169 for _, tc := range d.treeCaches { 170 tc.Stop() 171 } 172 for _, pathUpdate := range d.pathUpdates { 173 // Drain event channel in case the treecache leaks goroutines otherwise. 174 for range pathUpdate { 175 } 176 } 177 d.conn.Close() 178 }() 179 180 for _, pathUpdate := range d.pathUpdates { 181 go func(update chan treecache.ZookeeperTreeCacheEvent) { 182 for event := range update { 183 select { 184 case d.updates <- event: 185 case <-ctx.Done(): 186 return 187 } 188 } 189 }(pathUpdate) 190 } 191 192 for { 193 select { 194 case <-ctx.Done(): 195 return 196 case event := <-d.updates: 197 tg := &targetgroup.Group{ 198 Source: event.Path, 199 } 200 if event.Data != nil { 201 labelSet, err := d.parse(*event.Data, event.Path) 202 if err == nil { 203 tg.Targets = []model.LabelSet{labelSet} 204 d.sources[event.Path] = tg 205 } else { 206 delete(d.sources, event.Path) 207 } 208 } else { 209 delete(d.sources, event.Path) 210 } 211 select { 212 case <-ctx.Done(): 213 return 214 case ch <- []*targetgroup.Group{tg}: 215 } 216 } 217 } 218} 219 220const ( 221 serversetLabelPrefix = model.MetaLabelPrefix + "serverset_" 222 serversetStatusLabel = serversetLabelPrefix + "status" 223 serversetPathLabel = serversetLabelPrefix + "path" 224 serversetEndpointLabelPrefix = serversetLabelPrefix + "endpoint" 225 serversetShardLabel = serversetLabelPrefix + "shard" 226) 227 228type serversetMember struct { 229 ServiceEndpoint serversetEndpoint 230 AdditionalEndpoints map[string]serversetEndpoint 231 Status string `json:"status"` 232 Shard int `json:"shard"` 233} 234 235type serversetEndpoint struct { 236 Host string 237 Port int 238} 239 240func parseServersetMember(data []byte, path string) (model.LabelSet, error) { 241 member := serversetMember{} 242 243 if err := json.Unmarshal(data, &member); err != nil { 244 return nil, errors.Wrapf(err, "error unmarshaling serverset member %q", path) 245 } 246 247 labels := model.LabelSet{} 248 labels[serversetPathLabel] = model.LabelValue(path) 249 labels[model.AddressLabel] = model.LabelValue( 250 net.JoinHostPort(member.ServiceEndpoint.Host, fmt.Sprintf("%d", member.ServiceEndpoint.Port))) 251 252 labels[serversetEndpointLabelPrefix+"_host"] = model.LabelValue(member.ServiceEndpoint.Host) 253 labels[serversetEndpointLabelPrefix+"_port"] = model.LabelValue(fmt.Sprintf("%d", member.ServiceEndpoint.Port)) 254 255 for name, endpoint := range member.AdditionalEndpoints { 256 cleanName := model.LabelName(strutil.SanitizeLabelName(name)) 257 labels[serversetEndpointLabelPrefix+"_host_"+cleanName] = model.LabelValue( 258 endpoint.Host) 259 labels[serversetEndpointLabelPrefix+"_port_"+cleanName] = model.LabelValue( 260 fmt.Sprintf("%d", endpoint.Port)) 261 262 } 263 264 labels[serversetStatusLabel] = model.LabelValue(member.Status) 265 labels[serversetShardLabel] = model.LabelValue(strconv.Itoa(member.Shard)) 266 267 return labels, nil 268} 269 270const ( 271 nerveLabelPrefix = model.MetaLabelPrefix + "nerve_" 272 nervePathLabel = nerveLabelPrefix + "path" 273 nerveEndpointLabelPrefix = nerveLabelPrefix + "endpoint" 274) 275 276type nerveMember struct { 277 Host string `json:"host"` 278 Port int `json:"port"` 279 Name string `json:"name"` 280} 281 282func parseNerveMember(data []byte, path string) (model.LabelSet, error) { 283 member := nerveMember{} 284 err := json.Unmarshal(data, &member) 285 if err != nil { 286 return nil, errors.Wrapf(err, "error unmarshaling nerve member %q", path) 287 } 288 289 labels := model.LabelSet{} 290 labels[nervePathLabel] = model.LabelValue(path) 291 labels[model.AddressLabel] = model.LabelValue( 292 net.JoinHostPort(member.Host, fmt.Sprintf("%d", member.Port))) 293 294 labels[nerveEndpointLabelPrefix+"_host"] = model.LabelValue(member.Host) 295 labels[nerveEndpointLabelPrefix+"_port"] = model.LabelValue(fmt.Sprintf("%d", member.Port)) 296 labels[nerveEndpointLabelPrefix+"_name"] = model.LabelValue(member.Name) 297 298 return labels, nil 299} 300