1package zk
2
3import (
4	"errors"
5	"net"
6	"strings"
7	"time"
8
9	"github.com/samuel/go-zookeeper/zk"
10
11	"github.com/go-kit/kit/log"
12)
13
14// DefaultACL is the default ACL to use for creating znodes.
15var (
16	DefaultACL            = zk.WorldACL(zk.PermAll)
17	ErrInvalidCredentials = errors.New("invalid credentials provided")
18	ErrClientClosed       = errors.New("client service closed")
19	ErrNotRegistered      = errors.New("not registered")
20	ErrNodeNotFound       = errors.New("node not found")
21)
22
23const (
24	// DefaultConnectTimeout is the default timeout to establish a connection to
25	// a ZooKeeper node.
26	DefaultConnectTimeout = 2 * time.Second
27	// DefaultSessionTimeout is the default timeout to keep the current
28	// ZooKeeper session alive during a temporary disconnect.
29	DefaultSessionTimeout = 5 * time.Second
30)
31
32// Client is a wrapper around a lower level ZooKeeper client implementation.
33type Client interface {
34	// GetEntries should query the provided path in ZooKeeper, place a watch on
35	// it and retrieve data from its current child nodes.
36	GetEntries(path string) ([]string, <-chan zk.Event, error)
37	// CreateParentNodes should try to create the path in case it does not exist
38	// yet on ZooKeeper.
39	CreateParentNodes(path string) error
40	// Register a service with ZooKeeper.
41	Register(s *Service) error
42	// Deregister a service with ZooKeeper.
43	Deregister(s *Service) error
44	// Stop should properly shutdown the client implementation
45	Stop()
46}
47
48type clientConfig struct {
49	logger          log.Logger
50	acl             []zk.ACL
51	credentials     []byte
52	connectTimeout  time.Duration
53	sessionTimeout  time.Duration
54	rootNodePayload [][]byte
55	eventHandler    func(zk.Event)
56}
57
58// Option functions enable friendly APIs.
59type Option func(*clientConfig) error
60
61type client struct {
62	*zk.Conn
63	clientConfig
64	active bool
65	quit   chan struct{}
66}
67
68// ACL returns an Option specifying a non-default ACL for creating parent nodes.
69func ACL(acl []zk.ACL) Option {
70	return func(c *clientConfig) error {
71		c.acl = acl
72		return nil
73	}
74}
75
76// Credentials returns an Option specifying a user/password combination which
77// the client will use to authenticate itself with.
78func Credentials(user, pass string) Option {
79	return func(c *clientConfig) error {
80		if user == "" || pass == "" {
81			return ErrInvalidCredentials
82		}
83		c.credentials = []byte(user + ":" + pass)
84		return nil
85	}
86}
87
88// ConnectTimeout returns an Option specifying a non-default connection timeout
89// when we try to establish a connection to a ZooKeeper server.
90func ConnectTimeout(t time.Duration) Option {
91	return func(c *clientConfig) error {
92		if t.Seconds() < 1 {
93			return errors.New("invalid connect timeout (minimum value is 1 second)")
94		}
95		c.connectTimeout = t
96		return nil
97	}
98}
99
100// SessionTimeout returns an Option specifying a non-default session timeout.
101func SessionTimeout(t time.Duration) Option {
102	return func(c *clientConfig) error {
103		if t.Seconds() < 1 {
104			return errors.New("invalid session timeout (minimum value is 1 second)")
105		}
106		c.sessionTimeout = t
107		return nil
108	}
109}
110
111// Payload returns an Option specifying non-default data values for each znode
112// created by CreateParentNodes.
113func Payload(payload [][]byte) Option {
114	return func(c *clientConfig) error {
115		c.rootNodePayload = payload
116		return nil
117	}
118}
119
120// EventHandler returns an Option specifying a callback function to handle
121// incoming zk.Event payloads (ZooKeeper connection events).
122func EventHandler(handler func(zk.Event)) Option {
123	return func(c *clientConfig) error {
124		c.eventHandler = handler
125		return nil
126	}
127}
128
129// NewClient returns a ZooKeeper client with a connection to the server cluster.
130// It will return an error if the server cluster cannot be resolved.
131func NewClient(servers []string, logger log.Logger, options ...Option) (Client, error) {
132	defaultEventHandler := func(event zk.Event) {
133		logger.Log("eventtype", event.Type.String(), "server", event.Server, "state", event.State.String(), "err", event.Err)
134	}
135	config := clientConfig{
136		acl:            DefaultACL,
137		connectTimeout: DefaultConnectTimeout,
138		sessionTimeout: DefaultSessionTimeout,
139		eventHandler:   defaultEventHandler,
140		logger:         logger,
141	}
142	for _, option := range options {
143		if err := option(&config); err != nil {
144			return nil, err
145		}
146	}
147	// dialer overrides the default ZooKeeper library Dialer so we can configure
148	// the connectTimeout. The current library has a hardcoded value of 1 second
149	// and there are reports of race conditions, due to slow DNS resolvers and
150	// other network latency issues.
151	dialer := func(network, address string, _ time.Duration) (net.Conn, error) {
152		return net.DialTimeout(network, address, config.connectTimeout)
153	}
154	conn, eventc, err := zk.Connect(servers, config.sessionTimeout, withLogger(logger), zk.WithDialer(dialer))
155
156	if err != nil {
157		return nil, err
158	}
159
160	if len(config.credentials) > 0 {
161		err = conn.AddAuth("digest", config.credentials)
162		if err != nil {
163			return nil, err
164		}
165	}
166
167	c := &client{conn, config, true, make(chan struct{})}
168
169	// Start listening for incoming Event payloads and callback the set
170	// eventHandler.
171	go func() {
172		for {
173			select {
174			case event := <-eventc:
175				config.eventHandler(event)
176			case <-c.quit:
177				return
178			}
179		}
180	}()
181	return c, nil
182}
183
184// CreateParentNodes implements the ZooKeeper Client interface.
185func (c *client) CreateParentNodes(path string) error {
186	if !c.active {
187		return ErrClientClosed
188	}
189	if path[0] != '/' {
190		return zk.ErrInvalidPath
191	}
192	payload := []byte("")
193	pathString := ""
194	pathNodes := strings.Split(path, "/")
195	for i := 1; i < len(pathNodes); i++ {
196		if i <= len(c.rootNodePayload) {
197			payload = c.rootNodePayload[i-1]
198		} else {
199			payload = []byte("")
200		}
201		pathString += "/" + pathNodes[i]
202		_, err := c.Create(pathString, payload, 0, c.acl)
203		// not being able to create the node because it exists or not having
204		// sufficient rights is not an issue. It is ok for the node to already
205		// exist and/or us to only have read rights
206		if err != nil && err != zk.ErrNodeExists && err != zk.ErrNoAuth {
207			return err
208		}
209	}
210	return nil
211}
212
213// GetEntries implements the ZooKeeper Client interface.
214func (c *client) GetEntries(path string) ([]string, <-chan zk.Event, error) {
215	// retrieve list of child nodes for given path and add watch to path
216	znodes, _, eventc, err := c.ChildrenW(path)
217
218	if err != nil {
219		return nil, eventc, err
220	}
221
222	var resp []string
223	for _, znode := range znodes {
224		// retrieve payload for child znode and add to response array
225		if data, _, err := c.Get(path + "/" + znode); err == nil {
226			resp = append(resp, string(data))
227		}
228	}
229	return resp, eventc, nil
230}
231
232// Register implements the ZooKeeper Client interface.
233func (c *client) Register(s *Service) error {
234	if s.Path[len(s.Path)-1] != '/' {
235		s.Path += "/"
236	}
237	path := s.Path + s.Name
238	if err := c.CreateParentNodes(path); err != nil {
239		return err
240	}
241	if path[len(path)-1] != '/' {
242		path += "/"
243	}
244	node, err := c.CreateProtectedEphemeralSequential(path, s.Data, c.acl)
245	if err != nil {
246		return err
247	}
248	s.node = node
249	return nil
250}
251
252// Deregister implements the ZooKeeper Client interface.
253func (c *client) Deregister(s *Service) error {
254	if s.node == "" {
255		return ErrNotRegistered
256	}
257	path := s.Path + s.Name
258	found, stat, err := c.Exists(path)
259	if err != nil {
260		return err
261	}
262	if !found {
263		return ErrNodeNotFound
264	}
265	if err := c.Delete(path, stat.Version); err != nil {
266		return err
267	}
268	return nil
269}
270
271// Stop implements the ZooKeeper Client interface.
272func (c *client) Stop() {
273	c.active = false
274	close(c.quit)
275	c.Close()
276}
277