1// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package v2store
16
17import (
18	"encoding/json"
19	"fmt"
20	"path"
21	"strconv"
22	"strings"
23	"sync"
24	"time"
25
26	"go.etcd.io/etcd/etcdserver/api/v2error"
27	"go.etcd.io/etcd/pkg/types"
28
29	"github.com/jonboulle/clockwork"
30)
31
32// The default version to set when the store is first initialized.
33const defaultVersion = 2
34
35var minExpireTime time.Time
36
37func init() {
38	minExpireTime, _ = time.Parse(time.RFC3339, "2000-01-01T00:00:00Z")
39}
40
41type Store interface {
42	Version() int
43	Index() uint64
44
45	Get(nodePath string, recursive, sorted bool) (*Event, error)
46	Set(nodePath string, dir bool, value string, expireOpts TTLOptionSet) (*Event, error)
47	Update(nodePath string, newValue string, expireOpts TTLOptionSet) (*Event, error)
48	Create(nodePath string, dir bool, value string, unique bool,
49		expireOpts TTLOptionSet) (*Event, error)
50	CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
51		value string, expireOpts TTLOptionSet) (*Event, error)
52	Delete(nodePath string, dir, recursive bool) (*Event, error)
53	CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error)
54
55	Watch(prefix string, recursive, stream bool, sinceIndex uint64) (Watcher, error)
56
57	Save() ([]byte, error)
58	Recovery(state []byte) error
59
60	Clone() Store
61	SaveNoCopy() ([]byte, error)
62
63	JsonStats() []byte
64	DeleteExpiredKeys(cutoff time.Time)
65
66	HasTTLKeys() bool
67}
68
69type TTLOptionSet struct {
70	ExpireTime time.Time
71	Refresh    bool
72}
73
74type store struct {
75	Root           *node
76	WatcherHub     *watcherHub
77	CurrentIndex   uint64
78	Stats          *Stats
79	CurrentVersion int
80	ttlKeyHeap     *ttlKeyHeap  // need to recovery manually
81	worldLock      sync.RWMutex // stop the world lock
82	clock          clockwork.Clock
83	readonlySet    types.Set
84}
85
86// New creates a store where the given namespaces will be created as initial directories.
87func New(namespaces ...string) Store {
88	s := newStore(namespaces...)
89	s.clock = clockwork.NewRealClock()
90	return s
91}
92
93func newStore(namespaces ...string) *store {
94	s := new(store)
95	s.CurrentVersion = defaultVersion
96	s.Root = newDir(s, "/", s.CurrentIndex, nil, Permanent)
97	for _, namespace := range namespaces {
98		s.Root.Add(newDir(s, namespace, s.CurrentIndex, s.Root, Permanent))
99	}
100	s.Stats = newStats()
101	s.WatcherHub = newWatchHub(1000)
102	s.ttlKeyHeap = newTtlKeyHeap()
103	s.readonlySet = types.NewUnsafeSet(append(namespaces, "/")...)
104	return s
105}
106
107// Version retrieves current version of the store.
108func (s *store) Version() int {
109	return s.CurrentVersion
110}
111
112// Index retrieves the current index of the store.
113func (s *store) Index() uint64 {
114	s.worldLock.RLock()
115	defer s.worldLock.RUnlock()
116	return s.CurrentIndex
117}
118
119// Get returns a get event.
120// If recursive is true, it will return all the content under the node path.
121// If sorted is true, it will sort the content by keys.
122func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) {
123	var err *v2error.Error
124
125	s.worldLock.RLock()
126	defer s.worldLock.RUnlock()
127
128	defer func() {
129		if err == nil {
130			s.Stats.Inc(GetSuccess)
131			if recursive {
132				reportReadSuccess(GetRecursive)
133			} else {
134				reportReadSuccess(Get)
135			}
136			return
137		}
138
139		s.Stats.Inc(GetFail)
140		if recursive {
141			reportReadFailure(GetRecursive)
142		} else {
143			reportReadFailure(Get)
144		}
145	}()
146
147	n, err := s.internalGet(nodePath)
148	if err != nil {
149		return nil, err
150	}
151
152	e := newEvent(Get, nodePath, n.ModifiedIndex, n.CreatedIndex)
153	e.EtcdIndex = s.CurrentIndex
154	e.Node.loadInternalNode(n, recursive, sorted, s.clock)
155
156	return e, nil
157}
158
159// Create creates the node at nodePath. Create will help to create intermediate directories with no ttl.
160// If the node has already existed, create will fail.
161// If any node on the path is a file, create will fail.
162func (s *store) Create(nodePath string, dir bool, value string, unique bool, expireOpts TTLOptionSet) (*Event, error) {
163	var err *v2error.Error
164
165	s.worldLock.Lock()
166	defer s.worldLock.Unlock()
167
168	defer func() {
169		if err == nil {
170			s.Stats.Inc(CreateSuccess)
171			reportWriteSuccess(Create)
172			return
173		}
174
175		s.Stats.Inc(CreateFail)
176		reportWriteFailure(Create)
177	}()
178
179	e, err := s.internalCreate(nodePath, dir, value, unique, false, expireOpts.ExpireTime, Create)
180	if err != nil {
181		return nil, err
182	}
183
184	e.EtcdIndex = s.CurrentIndex
185	s.WatcherHub.notify(e)
186
187	return e, nil
188}
189
190// Set creates or replace the node at nodePath.
191func (s *store) Set(nodePath string, dir bool, value string, expireOpts TTLOptionSet) (*Event, error) {
192	var err *v2error.Error
193
194	s.worldLock.Lock()
195	defer s.worldLock.Unlock()
196
197	defer func() {
198		if err == nil {
199			s.Stats.Inc(SetSuccess)
200			reportWriteSuccess(Set)
201			return
202		}
203
204		s.Stats.Inc(SetFail)
205		reportWriteFailure(Set)
206	}()
207
208	// Get prevNode value
209	n, getErr := s.internalGet(nodePath)
210	if getErr != nil && getErr.ErrorCode != v2error.EcodeKeyNotFound {
211		err = getErr
212		return nil, err
213	}
214
215	if expireOpts.Refresh {
216		if getErr != nil {
217			err = getErr
218			return nil, err
219		}
220		value = n.Value
221	}
222
223	// Set new value
224	e, err := s.internalCreate(nodePath, dir, value, false, true, expireOpts.ExpireTime, Set)
225	if err != nil {
226		return nil, err
227	}
228	e.EtcdIndex = s.CurrentIndex
229
230	// Put prevNode into event
231	if getErr == nil {
232		prev := newEvent(Get, nodePath, n.ModifiedIndex, n.CreatedIndex)
233		prev.Node.loadInternalNode(n, false, false, s.clock)
234		e.PrevNode = prev.Node
235	}
236
237	if !expireOpts.Refresh {
238		s.WatcherHub.notify(e)
239	} else {
240		e.SetRefresh()
241		s.WatcherHub.add(e)
242	}
243
244	return e, nil
245}
246
247// returns user-readable cause of failed comparison
248func getCompareFailCause(n *node, which int, prevValue string, prevIndex uint64) string {
249	switch which {
250	case CompareIndexNotMatch:
251		return fmt.Sprintf("[%v != %v]", prevIndex, n.ModifiedIndex)
252	case CompareValueNotMatch:
253		return fmt.Sprintf("[%v != %v]", prevValue, n.Value)
254	default:
255		return fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex)
256	}
257}
258
259func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
260	value string, expireOpts TTLOptionSet) (*Event, error) {
261
262	var err *v2error.Error
263
264	s.worldLock.Lock()
265	defer s.worldLock.Unlock()
266
267	defer func() {
268		if err == nil {
269			s.Stats.Inc(CompareAndSwapSuccess)
270			reportWriteSuccess(CompareAndSwap)
271			return
272		}
273
274		s.Stats.Inc(CompareAndSwapFail)
275		reportWriteFailure(CompareAndSwap)
276	}()
277
278	nodePath = path.Clean(path.Join("/", nodePath))
279	// we do not allow the user to change "/"
280	if s.readonlySet.Contains(nodePath) {
281		return nil, v2error.NewError(v2error.EcodeRootROnly, "/", s.CurrentIndex)
282	}
283
284	n, err := s.internalGet(nodePath)
285	if err != nil {
286		return nil, err
287	}
288	if n.IsDir() { // can only compare and swap file
289		err = v2error.NewError(v2error.EcodeNotFile, nodePath, s.CurrentIndex)
290		return nil, err
291	}
292
293	// If both of the prevValue and prevIndex are given, we will test both of them.
294	// Command will be executed, only if both of the tests are successful.
295	if ok, which := n.Compare(prevValue, prevIndex); !ok {
296		cause := getCompareFailCause(n, which, prevValue, prevIndex)
297		err = v2error.NewError(v2error.EcodeTestFailed, cause, s.CurrentIndex)
298		return nil, err
299	}
300
301	if expireOpts.Refresh {
302		value = n.Value
303	}
304
305	// update etcd index
306	s.CurrentIndex++
307
308	e := newEvent(CompareAndSwap, nodePath, s.CurrentIndex, n.CreatedIndex)
309	e.EtcdIndex = s.CurrentIndex
310	e.PrevNode = n.Repr(false, false, s.clock)
311	eNode := e.Node
312
313	// if test succeed, write the value
314	n.Write(value, s.CurrentIndex)
315	n.UpdateTTL(expireOpts.ExpireTime)
316
317	// copy the value for safety
318	valueCopy := value
319	eNode.Value = &valueCopy
320	eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock)
321
322	if !expireOpts.Refresh {
323		s.WatcherHub.notify(e)
324	} else {
325		e.SetRefresh()
326		s.WatcherHub.add(e)
327	}
328
329	return e, nil
330}
331
332// Delete deletes the node at the given path.
333// If the node is a directory, recursive must be true to delete it.
334func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
335	var err *v2error.Error
336
337	s.worldLock.Lock()
338	defer s.worldLock.Unlock()
339
340	defer func() {
341		if err == nil {
342			s.Stats.Inc(DeleteSuccess)
343			reportWriteSuccess(Delete)
344			return
345		}
346
347		s.Stats.Inc(DeleteFail)
348		reportWriteFailure(Delete)
349	}()
350
351	nodePath = path.Clean(path.Join("/", nodePath))
352	// we do not allow the user to change "/"
353	if s.readonlySet.Contains(nodePath) {
354		return nil, v2error.NewError(v2error.EcodeRootROnly, "/", s.CurrentIndex)
355	}
356
357	// recursive implies dir
358	if recursive {
359		dir = true
360	}
361
362	n, err := s.internalGet(nodePath)
363	if err != nil { // if the node does not exist, return error
364		return nil, err
365	}
366
367	nextIndex := s.CurrentIndex + 1
368	e := newEvent(Delete, nodePath, nextIndex, n.CreatedIndex)
369	e.EtcdIndex = nextIndex
370	e.PrevNode = n.Repr(false, false, s.clock)
371	eNode := e.Node
372
373	if n.IsDir() {
374		eNode.Dir = true
375	}
376
377	callback := func(path string) { // notify function
378		// notify the watchers with deleted set true
379		s.WatcherHub.notifyWatchers(e, path, true)
380	}
381
382	err = n.Remove(dir, recursive, callback)
383	if err != nil {
384		return nil, err
385	}
386
387	// update etcd index
388	s.CurrentIndex++
389
390	s.WatcherHub.notify(e)
391
392	return e, nil
393}
394
395func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error) {
396	var err *v2error.Error
397
398	s.worldLock.Lock()
399	defer s.worldLock.Unlock()
400
401	defer func() {
402		if err == nil {
403			s.Stats.Inc(CompareAndDeleteSuccess)
404			reportWriteSuccess(CompareAndDelete)
405			return
406		}
407
408		s.Stats.Inc(CompareAndDeleteFail)
409		reportWriteFailure(CompareAndDelete)
410	}()
411
412	nodePath = path.Clean(path.Join("/", nodePath))
413
414	n, err := s.internalGet(nodePath)
415	if err != nil { // if the node does not exist, return error
416		return nil, err
417	}
418	if n.IsDir() { // can only compare and delete file
419		return nil, v2error.NewError(v2error.EcodeNotFile, nodePath, s.CurrentIndex)
420	}
421
422	// If both of the prevValue and prevIndex are given, we will test both of them.
423	// Command will be executed, only if both of the tests are successful.
424	if ok, which := n.Compare(prevValue, prevIndex); !ok {
425		cause := getCompareFailCause(n, which, prevValue, prevIndex)
426		return nil, v2error.NewError(v2error.EcodeTestFailed, cause, s.CurrentIndex)
427	}
428
429	// update etcd index
430	s.CurrentIndex++
431
432	e := newEvent(CompareAndDelete, nodePath, s.CurrentIndex, n.CreatedIndex)
433	e.EtcdIndex = s.CurrentIndex
434	e.PrevNode = n.Repr(false, false, s.clock)
435
436	callback := func(path string) { // notify function
437		// notify the watchers with deleted set true
438		s.WatcherHub.notifyWatchers(e, path, true)
439	}
440
441	err = n.Remove(false, false, callback)
442	if err != nil {
443		return nil, err
444	}
445
446	s.WatcherHub.notify(e)
447
448	return e, nil
449}
450
451func (s *store) Watch(key string, recursive, stream bool, sinceIndex uint64) (Watcher, error) {
452	s.worldLock.RLock()
453	defer s.worldLock.RUnlock()
454
455	key = path.Clean(path.Join("/", key))
456	if sinceIndex == 0 {
457		sinceIndex = s.CurrentIndex + 1
458	}
459	// WatcherHub does not know about the current index, so we need to pass it in
460	w, err := s.WatcherHub.watch(key, recursive, stream, sinceIndex, s.CurrentIndex)
461	if err != nil {
462		return nil, err
463	}
464
465	return w, nil
466}
467
468// walk walks all the nodePath and apply the walkFunc on each directory
469func (s *store) walk(nodePath string, walkFunc func(prev *node, component string) (*node, *v2error.Error)) (*node, *v2error.Error) {
470	components := strings.Split(nodePath, "/")
471
472	curr := s.Root
473	var err *v2error.Error
474
475	for i := 1; i < len(components); i++ {
476		if len(components[i]) == 0 { // ignore empty string
477			return curr, nil
478		}
479
480		curr, err = walkFunc(curr, components[i])
481		if err != nil {
482			return nil, err
483		}
484	}
485
486	return curr, nil
487}
488
489// Update updates the value/ttl of the node.
490// If the node is a file, the value and the ttl can be updated.
491// If the node is a directory, only the ttl can be updated.
492func (s *store) Update(nodePath string, newValue string, expireOpts TTLOptionSet) (*Event, error) {
493	var err *v2error.Error
494
495	s.worldLock.Lock()
496	defer s.worldLock.Unlock()
497
498	defer func() {
499		if err == nil {
500			s.Stats.Inc(UpdateSuccess)
501			reportWriteSuccess(Update)
502			return
503		}
504
505		s.Stats.Inc(UpdateFail)
506		reportWriteFailure(Update)
507	}()
508
509	nodePath = path.Clean(path.Join("/", nodePath))
510	// we do not allow the user to change "/"
511	if s.readonlySet.Contains(nodePath) {
512		return nil, v2error.NewError(v2error.EcodeRootROnly, "/", s.CurrentIndex)
513	}
514
515	currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
516
517	n, err := s.internalGet(nodePath)
518	if err != nil { // if the node does not exist, return error
519		return nil, err
520	}
521	if n.IsDir() && len(newValue) != 0 {
522		// if the node is a directory, we cannot update value to non-empty
523		return nil, v2error.NewError(v2error.EcodeNotFile, nodePath, currIndex)
524	}
525
526	if expireOpts.Refresh {
527		newValue = n.Value
528	}
529
530	e := newEvent(Update, nodePath, nextIndex, n.CreatedIndex)
531	e.EtcdIndex = nextIndex
532	e.PrevNode = n.Repr(false, false, s.clock)
533	eNode := e.Node
534
535	n.Write(newValue, nextIndex)
536
537	if n.IsDir() {
538		eNode.Dir = true
539	} else {
540		// copy the value for safety
541		newValueCopy := newValue
542		eNode.Value = &newValueCopy
543	}
544
545	// update ttl
546	n.UpdateTTL(expireOpts.ExpireTime)
547
548	eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock)
549
550	if !expireOpts.Refresh {
551		s.WatcherHub.notify(e)
552	} else {
553		e.SetRefresh()
554		s.WatcherHub.add(e)
555	}
556
557	s.CurrentIndex = nextIndex
558
559	return e, nil
560}
561
562func (s *store) internalCreate(nodePath string, dir bool, value string, unique, replace bool,
563	expireTime time.Time, action string) (*Event, *v2error.Error) {
564
565	currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
566
567	if unique { // append unique item under the node path
568		nodePath += "/" + fmt.Sprintf("%020s", strconv.FormatUint(nextIndex, 10))
569	}
570
571	nodePath = path.Clean(path.Join("/", nodePath))
572
573	// we do not allow the user to change "/"
574	if s.readonlySet.Contains(nodePath) {
575		return nil, v2error.NewError(v2error.EcodeRootROnly, "/", currIndex)
576	}
577
578	// Assume expire times that are way in the past are
579	// This can occur when the time is serialized to JS
580	if expireTime.Before(minExpireTime) {
581		expireTime = Permanent
582	}
583
584	dirName, nodeName := path.Split(nodePath)
585
586	// walk through the nodePath, create dirs and get the last directory node
587	d, err := s.walk(dirName, s.checkDir)
588
589	if err != nil {
590		s.Stats.Inc(SetFail)
591		reportWriteFailure(action)
592		err.Index = currIndex
593		return nil, err
594	}
595
596	e := newEvent(action, nodePath, nextIndex, nextIndex)
597	eNode := e.Node
598
599	n, _ := d.GetChild(nodeName)
600
601	// force will try to replace an existing file
602	if n != nil {
603		if replace {
604			if n.IsDir() {
605				return nil, v2error.NewError(v2error.EcodeNotFile, nodePath, currIndex)
606			}
607			e.PrevNode = n.Repr(false, false, s.clock)
608
609			n.Remove(false, false, nil)
610		} else {
611			return nil, v2error.NewError(v2error.EcodeNodeExist, nodePath, currIndex)
612		}
613	}
614
615	if !dir { // create file
616		// copy the value for safety
617		valueCopy := value
618		eNode.Value = &valueCopy
619
620		n = newKV(s, nodePath, value, nextIndex, d, expireTime)
621
622	} else { // create directory
623		eNode.Dir = true
624
625		n = newDir(s, nodePath, nextIndex, d, expireTime)
626	}
627
628	// we are sure d is a directory and does not have the children with name n.Name
629	d.Add(n)
630
631	// node with TTL
632	if !n.IsPermanent() {
633		s.ttlKeyHeap.push(n)
634
635		eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock)
636	}
637
638	s.CurrentIndex = nextIndex
639
640	return e, nil
641}
642
643// InternalGet gets the node of the given nodePath.
644func (s *store) internalGet(nodePath string) (*node, *v2error.Error) {
645	nodePath = path.Clean(path.Join("/", nodePath))
646
647	walkFunc := func(parent *node, name string) (*node, *v2error.Error) {
648
649		if !parent.IsDir() {
650			err := v2error.NewError(v2error.EcodeNotDir, parent.Path, s.CurrentIndex)
651			return nil, err
652		}
653
654		child, ok := parent.Children[name]
655		if ok {
656			return child, nil
657		}
658
659		return nil, v2error.NewError(v2error.EcodeKeyNotFound, path.Join(parent.Path, name), s.CurrentIndex)
660	}
661
662	f, err := s.walk(nodePath, walkFunc)
663
664	if err != nil {
665		return nil, err
666	}
667	return f, nil
668}
669
670// DeleteExpiredKeys will delete all expired keys
671func (s *store) DeleteExpiredKeys(cutoff time.Time) {
672	s.worldLock.Lock()
673	defer s.worldLock.Unlock()
674
675	for {
676		node := s.ttlKeyHeap.top()
677		if node == nil || node.ExpireTime.After(cutoff) {
678			break
679		}
680
681		s.CurrentIndex++
682		e := newEvent(Expire, node.Path, s.CurrentIndex, node.CreatedIndex)
683		e.EtcdIndex = s.CurrentIndex
684		e.PrevNode = node.Repr(false, false, s.clock)
685		if node.IsDir() {
686			e.Node.Dir = true
687		}
688
689		callback := func(path string) { // notify function
690			// notify the watchers with deleted set true
691			s.WatcherHub.notifyWatchers(e, path, true)
692		}
693
694		s.ttlKeyHeap.pop()
695		node.Remove(true, true, callback)
696
697		reportExpiredKey()
698		s.Stats.Inc(ExpireCount)
699
700		s.WatcherHub.notify(e)
701	}
702
703}
704
705// checkDir will check whether the component is a directory under parent node.
706// If it is a directory, this function will return the pointer to that node.
707// If it does not exist, this function will create a new directory and return the pointer to that node.
708// If it is a file, this function will return error.
709func (s *store) checkDir(parent *node, dirName string) (*node, *v2error.Error) {
710	node, ok := parent.Children[dirName]
711
712	if ok {
713		if node.IsDir() {
714			return node, nil
715		}
716
717		return nil, v2error.NewError(v2error.EcodeNotDir, node.Path, s.CurrentIndex)
718	}
719
720	n := newDir(s, path.Join(parent.Path, dirName), s.CurrentIndex+1, parent, Permanent)
721
722	parent.Children[dirName] = n
723
724	return n, nil
725}
726
727// Save saves the static state of the store system.
728// It will not be able to save the state of watchers.
729// It will not save the parent field of the node. Or there will
730// be cyclic dependencies issue for the json package.
731func (s *store) Save() ([]byte, error) {
732	b, err := json.Marshal(s.Clone())
733	if err != nil {
734		return nil, err
735	}
736
737	return b, nil
738}
739
740func (s *store) SaveNoCopy() ([]byte, error) {
741	b, err := json.Marshal(s)
742	if err != nil {
743		return nil, err
744	}
745
746	return b, nil
747}
748
749func (s *store) Clone() Store {
750	s.worldLock.Lock()
751
752	clonedStore := newStore()
753	clonedStore.CurrentIndex = s.CurrentIndex
754	clonedStore.Root = s.Root.Clone()
755	clonedStore.WatcherHub = s.WatcherHub.clone()
756	clonedStore.Stats = s.Stats.clone()
757	clonedStore.CurrentVersion = s.CurrentVersion
758
759	s.worldLock.Unlock()
760	return clonedStore
761}
762
763// Recovery recovers the store system from a static state
764// It needs to recover the parent field of the nodes.
765// It needs to delete the expired nodes since the saved time and also
766// needs to create monitoring go routines.
767func (s *store) Recovery(state []byte) error {
768	s.worldLock.Lock()
769	defer s.worldLock.Unlock()
770	err := json.Unmarshal(state, s)
771
772	if err != nil {
773		return err
774	}
775
776	s.ttlKeyHeap = newTtlKeyHeap()
777
778	s.Root.recoverAndclean()
779	return nil
780}
781
782func (s *store) JsonStats() []byte {
783	s.Stats.Watchers = uint64(s.WatcherHub.count)
784	return s.Stats.toJson()
785}
786
787func (s *store) HasTTLKeys() bool {
788	s.worldLock.RLock()
789	defer s.worldLock.RUnlock()
790	return s.ttlKeyHeap.Len() != 0
791}
792