1package zk 2 3import ( 4 "errors" 5 "net" 6 "strings" 7 "time" 8 9 "github.com/samuel/go-zookeeper/zk" 10 11 "github.com/go-kit/kit/log" 12) 13 14// DefaultACL is the default ACL to use for creating znodes. 15var ( 16 DefaultACL = zk.WorldACL(zk.PermAll) 17 ErrInvalidCredentials = errors.New("invalid credentials provided") 18 ErrClientClosed = errors.New("client service closed") 19 ErrNotRegistered = errors.New("not registered") 20 ErrNodeNotFound = errors.New("node not found") 21) 22 23const ( 24 // DefaultConnectTimeout is the default timeout to establish a connection to 25 // a ZooKeeper node. 26 DefaultConnectTimeout = 2 * time.Second 27 // DefaultSessionTimeout is the default timeout to keep the current 28 // ZooKeeper session alive during a temporary disconnect. 29 DefaultSessionTimeout = 5 * time.Second 30) 31 32// Client is a wrapper around a lower level ZooKeeper client implementation. 33type Client interface { 34 // GetEntries should query the provided path in ZooKeeper, place a watch on 35 // it and retrieve data from its current child nodes. 36 GetEntries(path string) ([]string, <-chan zk.Event, error) 37 // CreateParentNodes should try to create the path in case it does not exist 38 // yet on ZooKeeper. 39 CreateParentNodes(path string) error 40 // Register a service with ZooKeeper. 41 Register(s *Service) error 42 // Deregister a service with ZooKeeper. 43 Deregister(s *Service) error 44 // Stop should properly shutdown the client implementation 45 Stop() 46} 47 48type clientConfig struct { 49 logger log.Logger 50 acl []zk.ACL 51 credentials []byte 52 connectTimeout time.Duration 53 sessionTimeout time.Duration 54 rootNodePayload [][]byte 55 eventHandler func(zk.Event) 56} 57 58// Option functions enable friendly APIs. 59type Option func(*clientConfig) error 60 61type client struct { 62 *zk.Conn 63 clientConfig 64 active bool 65 quit chan struct{} 66} 67 68// ACL returns an Option specifying a non-default ACL for creating parent nodes. 69func ACL(acl []zk.ACL) Option { 70 return func(c *clientConfig) error { 71 c.acl = acl 72 return nil 73 } 74} 75 76// Credentials returns an Option specifying a user/password combination which 77// the client will use to authenticate itself with. 78func Credentials(user, pass string) Option { 79 return func(c *clientConfig) error { 80 if user == "" || pass == "" { 81 return ErrInvalidCredentials 82 } 83 c.credentials = []byte(user + ":" + pass) 84 return nil 85 } 86} 87 88// ConnectTimeout returns an Option specifying a non-default connection timeout 89// when we try to establish a connection to a ZooKeeper server. 90func ConnectTimeout(t time.Duration) Option { 91 return func(c *clientConfig) error { 92 if t.Seconds() < 1 { 93 return errors.New("invalid connect timeout (minimum value is 1 second)") 94 } 95 c.connectTimeout = t 96 return nil 97 } 98} 99 100// SessionTimeout returns an Option specifying a non-default session timeout. 101func SessionTimeout(t time.Duration) Option { 102 return func(c *clientConfig) error { 103 if t.Seconds() < 1 { 104 return errors.New("invalid session timeout (minimum value is 1 second)") 105 } 106 c.sessionTimeout = t 107 return nil 108 } 109} 110 111// Payload returns an Option specifying non-default data values for each znode 112// created by CreateParentNodes. 113func Payload(payload [][]byte) Option { 114 return func(c *clientConfig) error { 115 c.rootNodePayload = payload 116 return nil 117 } 118} 119 120// EventHandler returns an Option specifying a callback function to handle 121// incoming zk.Event payloads (ZooKeeper connection events). 122func EventHandler(handler func(zk.Event)) Option { 123 return func(c *clientConfig) error { 124 c.eventHandler = handler 125 return nil 126 } 127} 128 129// NewClient returns a ZooKeeper client with a connection to the server cluster. 130// It will return an error if the server cluster cannot be resolved. 131func NewClient(servers []string, logger log.Logger, options ...Option) (Client, error) { 132 defaultEventHandler := func(event zk.Event) { 133 logger.Log("eventtype", event.Type.String(), "server", event.Server, "state", event.State.String(), "err", event.Err) 134 } 135 config := clientConfig{ 136 acl: DefaultACL, 137 connectTimeout: DefaultConnectTimeout, 138 sessionTimeout: DefaultSessionTimeout, 139 eventHandler: defaultEventHandler, 140 logger: logger, 141 } 142 for _, option := range options { 143 if err := option(&config); err != nil { 144 return nil, err 145 } 146 } 147 // dialer overrides the default ZooKeeper library Dialer so we can configure 148 // the connectTimeout. The current library has a hardcoded value of 1 second 149 // and there are reports of race conditions, due to slow DNS resolvers and 150 // other network latency issues. 151 dialer := func(network, address string, _ time.Duration) (net.Conn, error) { 152 return net.DialTimeout(network, address, config.connectTimeout) 153 } 154 conn, eventc, err := zk.Connect(servers, config.sessionTimeout, withLogger(logger), zk.WithDialer(dialer)) 155 156 if err != nil { 157 return nil, err 158 } 159 160 if len(config.credentials) > 0 { 161 err = conn.AddAuth("digest", config.credentials) 162 if err != nil { 163 return nil, err 164 } 165 } 166 167 c := &client{conn, config, true, make(chan struct{})} 168 169 // Start listening for incoming Event payloads and callback the set 170 // eventHandler. 171 go func() { 172 for { 173 select { 174 case event := <-eventc: 175 config.eventHandler(event) 176 case <-c.quit: 177 return 178 } 179 } 180 }() 181 return c, nil 182} 183 184// CreateParentNodes implements the ZooKeeper Client interface. 185func (c *client) CreateParentNodes(path string) error { 186 if !c.active { 187 return ErrClientClosed 188 } 189 if path[0] != '/' { 190 return zk.ErrInvalidPath 191 } 192 payload := []byte("") 193 pathString := "" 194 pathNodes := strings.Split(path, "/") 195 for i := 1; i < len(pathNodes); i++ { 196 if i <= len(c.rootNodePayload) { 197 payload = c.rootNodePayload[i-1] 198 } else { 199 payload = []byte("") 200 } 201 pathString += "/" + pathNodes[i] 202 _, err := c.Create(pathString, payload, 0, c.acl) 203 // not being able to create the node because it exists or not having 204 // sufficient rights is not an issue. It is ok for the node to already 205 // exist and/or us to only have read rights 206 if err != nil && err != zk.ErrNodeExists && err != zk.ErrNoAuth { 207 return err 208 } 209 } 210 return nil 211} 212 213// GetEntries implements the ZooKeeper Client interface. 214func (c *client) GetEntries(path string) ([]string, <-chan zk.Event, error) { 215 // retrieve list of child nodes for given path and add watch to path 216 znodes, _, eventc, err := c.ChildrenW(path) 217 218 if err != nil { 219 return nil, eventc, err 220 } 221 222 var resp []string 223 for _, znode := range znodes { 224 // retrieve payload for child znode and add to response array 225 if data, _, err := c.Get(path + "/" + znode); err == nil { 226 resp = append(resp, string(data)) 227 } 228 } 229 return resp, eventc, nil 230} 231 232// Register implements the ZooKeeper Client interface. 233func (c *client) Register(s *Service) error { 234 if s.Path[len(s.Path)-1] != '/' { 235 s.Path += "/" 236 } 237 path := s.Path + s.Name 238 if err := c.CreateParentNodes(path); err != nil { 239 return err 240 } 241 if path[len(path)-1] != '/' { 242 path += "/" 243 } 244 node, err := c.CreateProtectedEphemeralSequential(path, s.Data, c.acl) 245 if err != nil { 246 return err 247 } 248 s.node = node 249 return nil 250} 251 252// Deregister implements the ZooKeeper Client interface. 253func (c *client) Deregister(s *Service) error { 254 if s.node == "" { 255 return ErrNotRegistered 256 } 257 path := s.Path + s.Name 258 found, stat, err := c.Exists(path) 259 if err != nil { 260 return err 261 } 262 if !found { 263 return ErrNodeNotFound 264 } 265 if err := c.Delete(path, stat.Version); err != nil { 266 return err 267 } 268 return nil 269} 270 271// Stop implements the ZooKeeper Client interface. 272func (c *client) Stop() { 273 c.active = false 274 close(c.quit) 275 c.Close() 276} 277