1package zookeeper 2 3import ( 4 "strings" 5 "time" 6 7 "github.com/docker/libkv" 8 "github.com/docker/libkv/store" 9 zk "github.com/samuel/go-zookeeper/zk" 10) 11 12const ( 13 // SOH control character 14 SOH = "\x01" 15 16 defaultTimeout = 10 * time.Second 17) 18 19// Zookeeper is the receiver type for 20// the Store interface 21type Zookeeper struct { 22 timeout time.Duration 23 client *zk.Conn 24} 25 26type zookeeperLock struct { 27 client *zk.Conn 28 lock *zk.Lock 29 key string 30 value []byte 31} 32 33// Register registers zookeeper to libkv 34func Register() { 35 libkv.AddStore(store.ZK, New) 36} 37 38// New creates a new Zookeeper client given a 39// list of endpoints and an optional tls config 40func New(endpoints []string, options *store.Config) (store.Store, error) { 41 s := &Zookeeper{} 42 s.timeout = defaultTimeout 43 44 // Set options 45 if options != nil { 46 if options.ConnectionTimeout != 0 { 47 s.setTimeout(options.ConnectionTimeout) 48 } 49 } 50 51 // Connect to Zookeeper 52 conn, _, err := zk.Connect(endpoints, s.timeout) 53 if err != nil { 54 return nil, err 55 } 56 s.client = conn 57 58 return s, nil 59} 60 61// setTimeout sets the timeout for connecting to Zookeeper 62func (s *Zookeeper) setTimeout(time time.Duration) { 63 s.timeout = time 64} 65 66// Get the value at "key", returns the last modified index 67// to use in conjunction to Atomic calls 68func (s *Zookeeper) Get(key string) (pair *store.KVPair, err error) { 69 resp, meta, err := s.client.Get(s.normalize(key)) 70 71 if err != nil { 72 if err == zk.ErrNoNode { 73 return nil, store.ErrKeyNotFound 74 } 75 return nil, err 76 } 77 78 // FIXME handle very rare cases where Get returns the 79 // SOH control character instead of the actual value 80 if string(resp) == SOH { 81 return s.Get(store.Normalize(key)) 82 } 83 84 pair = &store.KVPair{ 85 Key: key, 86 Value: resp, 87 LastIndex: uint64(meta.Version), 88 } 89 90 return pair, nil 91} 92 93// createFullPath creates the entire path for a directory 94// that does not exist 95func (s *Zookeeper) createFullPath(path []string, ephemeral bool) error { 96 for i := 1; i <= len(path); i++ { 97 newpath := "/" + strings.Join(path[:i], "/") 98 if i == len(path) && ephemeral { 99 _, err := s.client.Create(newpath, []byte{}, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) 100 return err 101 } 102 _, err := s.client.Create(newpath, []byte{}, 0, zk.WorldACL(zk.PermAll)) 103 if err != nil { 104 // Skip if node already exists 105 if err != zk.ErrNodeExists { 106 return err 107 } 108 } 109 } 110 return nil 111} 112 113// Put a value at "key" 114func (s *Zookeeper) Put(key string, value []byte, opts *store.WriteOptions) error { 115 fkey := s.normalize(key) 116 117 exists, err := s.Exists(key) 118 if err != nil { 119 return err 120 } 121 122 if !exists { 123 if opts != nil && opts.TTL > 0 { 124 s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), true) 125 } else { 126 s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), false) 127 } 128 } 129 130 _, err = s.client.Set(fkey, value, -1) 131 return err 132} 133 134// Delete a value at "key" 135func (s *Zookeeper) Delete(key string) error { 136 err := s.client.Delete(s.normalize(key), -1) 137 if err == zk.ErrNoNode { 138 return store.ErrKeyNotFound 139 } 140 return err 141} 142 143// Exists checks if the key exists inside the store 144func (s *Zookeeper) Exists(key string) (bool, error) { 145 exists, _, err := s.client.Exists(s.normalize(key)) 146 if err != nil { 147 return false, err 148 } 149 return exists, nil 150} 151 152// Watch for changes on a "key" 153// It returns a channel that will receive changes or pass 154// on errors. Upon creation, the current value will first 155// be sent to the channel. Providing a non-nil stopCh can 156// be used to stop watching. 157func (s *Zookeeper) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) { 158 // Get the key first 159 pair, err := s.Get(key) 160 if err != nil { 161 return nil, err 162 } 163 164 // Catch zk notifications and fire changes into the channel. 165 watchCh := make(chan *store.KVPair) 166 go func() { 167 defer close(watchCh) 168 169 // Get returns the current value to the channel prior 170 // to listening to any event that may occur on that key 171 watchCh <- pair 172 for { 173 _, _, eventCh, err := s.client.GetW(s.normalize(key)) 174 if err != nil { 175 return 176 } 177 select { 178 case e := <-eventCh: 179 if e.Type == zk.EventNodeDataChanged { 180 if entry, err := s.Get(key); err == nil { 181 watchCh <- entry 182 } 183 } 184 case <-stopCh: 185 // There is no way to stop GetW so just quit 186 return 187 } 188 } 189 }() 190 191 return watchCh, nil 192} 193 194// WatchTree watches for changes on a "directory" 195// It returns a channel that will receive changes or pass 196// on errors. Upon creating a watch, the current childs values 197// will be sent to the channel .Providing a non-nil stopCh can 198// be used to stop watching. 199func (s *Zookeeper) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) { 200 // List the childrens first 201 entries, err := s.List(directory) 202 if err != nil { 203 return nil, err 204 } 205 206 // Catch zk notifications and fire changes into the channel. 207 watchCh := make(chan []*store.KVPair) 208 go func() { 209 defer close(watchCh) 210 211 // List returns the children values to the channel 212 // prior to listening to any events that may occur 213 // on those keys 214 watchCh <- entries 215 216 for { 217 _, _, eventCh, err := s.client.ChildrenW(s.normalize(directory)) 218 if err != nil { 219 return 220 } 221 select { 222 case e := <-eventCh: 223 if e.Type == zk.EventNodeChildrenChanged { 224 if kv, err := s.List(directory); err == nil { 225 watchCh <- kv 226 } 227 } 228 case <-stopCh: 229 // There is no way to stop GetW so just quit 230 return 231 } 232 } 233 }() 234 235 return watchCh, nil 236} 237 238// List child nodes of a given directory 239func (s *Zookeeper) List(directory string) ([]*store.KVPair, error) { 240 keys, stat, err := s.client.Children(s.normalize(directory)) 241 if err != nil { 242 if err == zk.ErrNoNode { 243 return nil, store.ErrKeyNotFound 244 } 245 return nil, err 246 } 247 248 kv := []*store.KVPair{} 249 250 // FIXME Costly Get request for each child key.. 251 for _, key := range keys { 252 pair, err := s.Get(strings.TrimSuffix(directory, "/") + s.normalize(key)) 253 if err != nil { 254 // If node is not found: List is out of date, retry 255 if err == store.ErrKeyNotFound { 256 return s.List(directory) 257 } 258 return nil, err 259 } 260 261 kv = append(kv, &store.KVPair{ 262 Key: key, 263 Value: []byte(pair.Value), 264 LastIndex: uint64(stat.Version), 265 }) 266 } 267 268 return kv, nil 269} 270 271// DeleteTree deletes a range of keys under a given directory 272func (s *Zookeeper) DeleteTree(directory string) error { 273 pairs, err := s.List(directory) 274 if err != nil { 275 return err 276 } 277 278 var reqs []interface{} 279 280 for _, pair := range pairs { 281 reqs = append(reqs, &zk.DeleteRequest{ 282 Path: s.normalize(directory + "/" + pair.Key), 283 Version: -1, 284 }) 285 } 286 287 _, err = s.client.Multi(reqs...) 288 return err 289} 290 291// AtomicPut put a value at "key" if the key has not been 292// modified in the meantime, throws an error if this is the case 293func (s *Zookeeper) AtomicPut(key string, value []byte, previous *store.KVPair, _ *store.WriteOptions) (bool, *store.KVPair, error) { 294 var lastIndex uint64 295 296 if previous != nil { 297 meta, err := s.client.Set(s.normalize(key), value, int32(previous.LastIndex)) 298 if err != nil { 299 // Compare Failed 300 if err == zk.ErrBadVersion { 301 return false, nil, store.ErrKeyModified 302 } 303 return false, nil, err 304 } 305 lastIndex = uint64(meta.Version) 306 } else { 307 // Interpret previous == nil as create operation. 308 _, err := s.client.Create(s.normalize(key), value, 0, zk.WorldACL(zk.PermAll)) 309 if err != nil { 310 // Directory does not exist 311 if err == zk.ErrNoNode { 312 313 // Create the directory 314 parts := store.SplitKey(strings.TrimSuffix(key, "/")) 315 parts = parts[:len(parts)-1] 316 if err = s.createFullPath(parts, false); err != nil { 317 // Failed to create the directory. 318 return false, nil, err 319 } 320 321 // Create the node 322 if _, err := s.client.Create(s.normalize(key), value, 0, zk.WorldACL(zk.PermAll)); err != nil { 323 // Node exist error (when previous nil) 324 if err == zk.ErrNodeExists { 325 return false, nil, store.ErrKeyExists 326 } 327 return false, nil, err 328 } 329 330 } else { 331 // Node Exists error (when previous nil) 332 if err == zk.ErrNodeExists { 333 return false, nil, store.ErrKeyExists 334 } 335 336 // Unhandled error 337 return false, nil, err 338 } 339 } 340 lastIndex = 0 // Newly created nodes have version 0. 341 } 342 343 pair := &store.KVPair{ 344 Key: key, 345 Value: value, 346 LastIndex: lastIndex, 347 } 348 349 return true, pair, nil 350} 351 352// AtomicDelete deletes a value at "key" if the key 353// has not been modified in the meantime, throws an 354// error if this is the case 355func (s *Zookeeper) AtomicDelete(key string, previous *store.KVPair) (bool, error) { 356 if previous == nil { 357 return false, store.ErrPreviousNotSpecified 358 } 359 360 err := s.client.Delete(s.normalize(key), int32(previous.LastIndex)) 361 if err != nil { 362 // Key not found 363 if err == zk.ErrNoNode { 364 return false, store.ErrKeyNotFound 365 } 366 // Compare failed 367 if err == zk.ErrBadVersion { 368 return false, store.ErrKeyModified 369 } 370 // General store error 371 return false, err 372 } 373 return true, nil 374} 375 376// NewLock returns a handle to a lock struct which can 377// be used to provide mutual exclusion on a key 378func (s *Zookeeper) NewLock(key string, options *store.LockOptions) (lock store.Locker, err error) { 379 value := []byte("") 380 381 // Apply options 382 if options != nil { 383 if options.Value != nil { 384 value = options.Value 385 } 386 } 387 388 lock = &zookeeperLock{ 389 client: s.client, 390 key: s.normalize(key), 391 value: value, 392 lock: zk.NewLock(s.client, s.normalize(key), zk.WorldACL(zk.PermAll)), 393 } 394 395 return lock, err 396} 397 398// Lock attempts to acquire the lock and blocks while 399// doing so. It returns a channel that is closed if our 400// lock is lost or if an error occurs 401func (l *zookeeperLock) Lock(stopChan chan struct{}) (<-chan struct{}, error) { 402 err := l.lock.Lock() 403 404 if err == nil { 405 // We hold the lock, we can set our value 406 // FIXME: The value is left behind 407 // (problematic for leader election) 408 _, err = l.client.Set(l.key, l.value, -1) 409 } 410 411 return make(chan struct{}), err 412} 413 414// Unlock the "key". Calling unlock while 415// not holding the lock will throw an error 416func (l *zookeeperLock) Unlock() error { 417 return l.lock.Unlock() 418} 419 420// Close closes the client connection 421func (s *Zookeeper) Close() { 422 s.client.Close() 423} 424 425// Normalize the key for usage in Zookeeper 426func (s *Zookeeper) normalize(key string) string { 427 key = store.Normalize(key) 428 return strings.TrimSuffix(key, "/") 429} 430