1// Copyright 2015 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package zookeeper
15
16import (
17	"context"
18	"encoding/json"
19	"fmt"
20	"net"
21	"strconv"
22	"strings"
23	"time"
24
25	"github.com/go-kit/kit/log"
26	"github.com/pkg/errors"
27	"github.com/prometheus/common/model"
28	"github.com/samuel/go-zookeeper/zk"
29
30	"github.com/prometheus/prometheus/discovery/targetgroup"
31	"github.com/prometheus/prometheus/util/strutil"
32	"github.com/prometheus/prometheus/util/treecache"
33)
34
35var (
36	// DefaultServersetSDConfig is the default Serverset SD configuration.
37	DefaultServersetSDConfig = ServersetSDConfig{
38		Timeout: model.Duration(10 * time.Second),
39	}
40	// DefaultNerveSDConfig is the default Nerve SD configuration.
41	DefaultNerveSDConfig = NerveSDConfig{
42		Timeout: model.Duration(10 * time.Second),
43	}
44)
45
46// ServersetSDConfig is the configuration for Twitter serversets in Zookeeper based discovery.
47type ServersetSDConfig struct {
48	Servers []string       `yaml:"servers"`
49	Paths   []string       `yaml:"paths"`
50	Timeout model.Duration `yaml:"timeout,omitempty"`
51}
52
53// UnmarshalYAML implements the yaml.Unmarshaler interface.
54func (c *ServersetSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
55	*c = DefaultServersetSDConfig
56	type plain ServersetSDConfig
57	err := unmarshal((*plain)(c))
58	if err != nil {
59		return err
60	}
61	if len(c.Servers) == 0 {
62		return errors.New("serverset SD config must contain at least one Zookeeper server")
63	}
64	if len(c.Paths) == 0 {
65		return errors.New("serverset SD config must contain at least one path")
66	}
67	for _, path := range c.Paths {
68		if !strings.HasPrefix(path, "/") {
69			return errors.Errorf("serverset SD config paths must begin with '/': %s", path)
70		}
71	}
72	return nil
73}
74
75// NerveSDConfig is the configuration for AirBnB's Nerve in Zookeeper based discovery.
76type NerveSDConfig struct {
77	Servers []string       `yaml:"servers"`
78	Paths   []string       `yaml:"paths"`
79	Timeout model.Duration `yaml:"timeout,omitempty"`
80}
81
82// UnmarshalYAML implements the yaml.Unmarshaler interface.
83func (c *NerveSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
84	*c = DefaultNerveSDConfig
85	type plain NerveSDConfig
86	err := unmarshal((*plain)(c))
87	if err != nil {
88		return err
89	}
90	if len(c.Servers) == 0 {
91		return errors.New("nerve SD config must contain at least one Zookeeper server")
92	}
93	if len(c.Paths) == 0 {
94		return errors.New("nerve SD config must contain at least one path")
95	}
96	for _, path := range c.Paths {
97		if !strings.HasPrefix(path, "/") {
98			return errors.Errorf("nerve SD config paths must begin with '/': %s", path)
99		}
100	}
101	return nil
102}
103
104// Discovery implements the Discoverer interface for discovering
105// targets from Zookeeper.
106type Discovery struct {
107	conn *zk.Conn
108
109	sources map[string]*targetgroup.Group
110
111	updates     chan treecache.ZookeeperTreeCacheEvent
112	pathUpdates []chan treecache.ZookeeperTreeCacheEvent
113	treeCaches  []*treecache.ZookeeperTreeCache
114
115	parse  func(data []byte, path string) (model.LabelSet, error)
116	logger log.Logger
117}
118
119// NewNerveDiscovery returns a new Discovery for the given Nerve config.
120func NewNerveDiscovery(conf *NerveSDConfig, logger log.Logger) (*Discovery, error) {
121	return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, logger, parseNerveMember)
122}
123
124// NewServersetDiscovery returns a new Discovery for the given serverset config.
125func NewServersetDiscovery(conf *ServersetSDConfig, logger log.Logger) (*Discovery, error) {
126	return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, logger, parseServersetMember)
127}
128
129// NewDiscovery returns a new discovery along Zookeeper parses with
130// the given parse function.
131func NewDiscovery(
132	srvs []string,
133	timeout time.Duration,
134	paths []string,
135	logger log.Logger,
136	pf func(data []byte, path string) (model.LabelSet, error),
137) (*Discovery, error) {
138	if logger == nil {
139		logger = log.NewNopLogger()
140	}
141
142	conn, _, err := zk.Connect(
143		srvs, timeout,
144		func(c *zk.Conn) {
145			c.SetLogger(treecache.NewZookeeperLogger(logger))
146		})
147	if err != nil {
148		return nil, err
149	}
150	updates := make(chan treecache.ZookeeperTreeCacheEvent)
151	sd := &Discovery{
152		conn:    conn,
153		updates: updates,
154		sources: map[string]*targetgroup.Group{},
155		parse:   pf,
156		logger:  logger,
157	}
158	for _, path := range paths {
159		pathUpdate := make(chan treecache.ZookeeperTreeCacheEvent)
160		sd.pathUpdates = append(sd.pathUpdates, pathUpdate)
161		sd.treeCaches = append(sd.treeCaches, treecache.NewZookeeperTreeCache(conn, path, pathUpdate, logger))
162	}
163	return sd, nil
164}
165
166// Run implements the Discoverer interface.
167func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
168	defer func() {
169		for _, tc := range d.treeCaches {
170			tc.Stop()
171		}
172		for _, pathUpdate := range d.pathUpdates {
173			// Drain event channel in case the treecache leaks goroutines otherwise.
174			for range pathUpdate {
175			}
176		}
177		d.conn.Close()
178	}()
179
180	for _, pathUpdate := range d.pathUpdates {
181		go func(update chan treecache.ZookeeperTreeCacheEvent) {
182			for event := range update {
183				select {
184				case d.updates <- event:
185				case <-ctx.Done():
186					return
187				}
188			}
189		}(pathUpdate)
190	}
191
192	for {
193		select {
194		case <-ctx.Done():
195			return
196		case event := <-d.updates:
197			tg := &targetgroup.Group{
198				Source: event.Path,
199			}
200			if event.Data != nil {
201				labelSet, err := d.parse(*event.Data, event.Path)
202				if err == nil {
203					tg.Targets = []model.LabelSet{labelSet}
204					d.sources[event.Path] = tg
205				} else {
206					delete(d.sources, event.Path)
207				}
208			} else {
209				delete(d.sources, event.Path)
210			}
211			select {
212			case <-ctx.Done():
213				return
214			case ch <- []*targetgroup.Group{tg}:
215			}
216		}
217	}
218}
219
220const (
221	serversetLabelPrefix         = model.MetaLabelPrefix + "serverset_"
222	serversetStatusLabel         = serversetLabelPrefix + "status"
223	serversetPathLabel           = serversetLabelPrefix + "path"
224	serversetEndpointLabelPrefix = serversetLabelPrefix + "endpoint"
225	serversetShardLabel          = serversetLabelPrefix + "shard"
226)
227
228type serversetMember struct {
229	ServiceEndpoint     serversetEndpoint
230	AdditionalEndpoints map[string]serversetEndpoint
231	Status              string `json:"status"`
232	Shard               int    `json:"shard"`
233}
234
235type serversetEndpoint struct {
236	Host string
237	Port int
238}
239
240func parseServersetMember(data []byte, path string) (model.LabelSet, error) {
241	member := serversetMember{}
242
243	if err := json.Unmarshal(data, &member); err != nil {
244		return nil, errors.Wrapf(err, "error unmarshaling serverset member %q", path)
245	}
246
247	labels := model.LabelSet{}
248	labels[serversetPathLabel] = model.LabelValue(path)
249	labels[model.AddressLabel] = model.LabelValue(
250		net.JoinHostPort(member.ServiceEndpoint.Host, fmt.Sprintf("%d", member.ServiceEndpoint.Port)))
251
252	labels[serversetEndpointLabelPrefix+"_host"] = model.LabelValue(member.ServiceEndpoint.Host)
253	labels[serversetEndpointLabelPrefix+"_port"] = model.LabelValue(fmt.Sprintf("%d", member.ServiceEndpoint.Port))
254
255	for name, endpoint := range member.AdditionalEndpoints {
256		cleanName := model.LabelName(strutil.SanitizeLabelName(name))
257		labels[serversetEndpointLabelPrefix+"_host_"+cleanName] = model.LabelValue(
258			endpoint.Host)
259		labels[serversetEndpointLabelPrefix+"_port_"+cleanName] = model.LabelValue(
260			fmt.Sprintf("%d", endpoint.Port))
261
262	}
263
264	labels[serversetStatusLabel] = model.LabelValue(member.Status)
265	labels[serversetShardLabel] = model.LabelValue(strconv.Itoa(member.Shard))
266
267	return labels, nil
268}
269
270const (
271	nerveLabelPrefix         = model.MetaLabelPrefix + "nerve_"
272	nervePathLabel           = nerveLabelPrefix + "path"
273	nerveEndpointLabelPrefix = nerveLabelPrefix + "endpoint"
274)
275
276type nerveMember struct {
277	Host string `json:"host"`
278	Port int    `json:"port"`
279	Name string `json:"name"`
280}
281
282func parseNerveMember(data []byte, path string) (model.LabelSet, error) {
283	member := nerveMember{}
284	err := json.Unmarshal(data, &member)
285	if err != nil {
286		return nil, errors.Wrapf(err, "error unmarshaling nerve member %q", path)
287	}
288
289	labels := model.LabelSet{}
290	labels[nervePathLabel] = model.LabelValue(path)
291	labels[model.AddressLabel] = model.LabelValue(
292		net.JoinHostPort(member.Host, fmt.Sprintf("%d", member.Port)))
293
294	labels[nerveEndpointLabelPrefix+"_host"] = model.LabelValue(member.Host)
295	labels[nerveEndpointLabelPrefix+"_port"] = model.LabelValue(fmt.Sprintf("%d", member.Port))
296	labels[nerveEndpointLabelPrefix+"_name"] = model.LabelValue(member.Name)
297
298	return labels, nil
299}
300