1// Copyright 2017 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package v2v3
16
17import (
18	"context"
19	"fmt"
20	"path"
21	"sort"
22	"strings"
23	"time"
24
25	"go.etcd.io/etcd/clientv3"
26	"go.etcd.io/etcd/clientv3/concurrency"
27	"go.etcd.io/etcd/etcdserver/api/v2error"
28	"go.etcd.io/etcd/etcdserver/api/v2store"
29	"go.etcd.io/etcd/mvcc/mvccpb"
30)
31
32// store implements the Store interface for V2 using
33// a v3 client.
34type v2v3Store struct {
35	c *clientv3.Client
36	// pfx is the v3 prefix where keys should be stored.
37	pfx string
38	ctx context.Context
39}
40
41const maxPathDepth = 63
42
43var errUnsupported = fmt.Errorf("TTLs are unsupported")
44
45func NewStore(c *clientv3.Client, pfx string) v2store.Store { return newStore(c, pfx) }
46
47func newStore(c *clientv3.Client, pfx string) *v2v3Store { return &v2v3Store{c, pfx, c.Ctx()} }
48
49func (s *v2v3Store) Index() uint64 { panic("STUB") }
50
51func (s *v2v3Store) Get(nodePath string, recursive, sorted bool) (*v2store.Event, error) {
52	key := s.mkPath(nodePath)
53	resp, err := s.c.Txn(s.ctx).Then(
54		clientv3.OpGet(key+"/"),
55		clientv3.OpGet(key),
56	).Commit()
57	if err != nil {
58		return nil, err
59	}
60
61	if kvs := resp.Responses[0].GetResponseRange().Kvs; len(kvs) != 0 || isRoot(nodePath) {
62		nodes, err := s.getDir(nodePath, recursive, sorted, resp.Header.Revision)
63		if err != nil {
64			return nil, err
65		}
66		cidx, midx := uint64(0), uint64(0)
67		if len(kvs) > 0 {
68			cidx, midx = mkV2Rev(kvs[0].CreateRevision), mkV2Rev(kvs[0].ModRevision)
69		}
70		return &v2store.Event{
71			Action: v2store.Get,
72			Node: &v2store.NodeExtern{
73				Key:           nodePath,
74				Dir:           true,
75				Nodes:         nodes,
76				CreatedIndex:  cidx,
77				ModifiedIndex: midx,
78			},
79			EtcdIndex: mkV2Rev(resp.Header.Revision),
80		}, nil
81	}
82
83	kvs := resp.Responses[1].GetResponseRange().Kvs
84	if len(kvs) == 0 {
85		return nil, v2error.NewError(v2error.EcodeKeyNotFound, nodePath, mkV2Rev(resp.Header.Revision))
86	}
87
88	return &v2store.Event{
89		Action:    v2store.Get,
90		Node:      s.mkV2Node(kvs[0]),
91		EtcdIndex: mkV2Rev(resp.Header.Revision),
92	}, nil
93}
94
95func (s *v2v3Store) getDir(nodePath string, recursive, sorted bool, rev int64) ([]*v2store.NodeExtern, error) {
96	rootNodes, err := s.getDirDepth(nodePath, 1, rev)
97	if err != nil || !recursive {
98		if sorted {
99			sort.Sort(v2store.NodeExterns(rootNodes))
100		}
101		return rootNodes, err
102	}
103	nextNodes := rootNodes
104	nodes := make(map[string]*v2store.NodeExtern)
105	// Breadth walk the subdirectories
106	for i := 2; len(nextNodes) > 0; i++ {
107		for _, n := range nextNodes {
108			nodes[n.Key] = n
109			if parent := nodes[path.Dir(n.Key)]; parent != nil {
110				parent.Nodes = append(parent.Nodes, n)
111			}
112		}
113		if nextNodes, err = s.getDirDepth(nodePath, i, rev); err != nil {
114			return nil, err
115		}
116	}
117
118	if sorted {
119		sort.Sort(v2store.NodeExterns(rootNodes))
120	}
121	return rootNodes, nil
122}
123
124func (s *v2v3Store) getDirDepth(nodePath string, depth int, rev int64) ([]*v2store.NodeExtern, error) {
125	pd := s.mkPathDepth(nodePath, depth)
126	resp, err := s.c.Get(s.ctx, pd, clientv3.WithPrefix(), clientv3.WithRev(rev))
127	if err != nil {
128		return nil, err
129	}
130
131	nodes := make([]*v2store.NodeExtern, len(resp.Kvs))
132	for i, kv := range resp.Kvs {
133		nodes[i] = s.mkV2Node(kv)
134	}
135	return nodes, nil
136}
137
138func (s *v2v3Store) Set(
139	nodePath string,
140	dir bool,
141	value string,
142	expireOpts v2store.TTLOptionSet,
143) (*v2store.Event, error) {
144	if expireOpts.Refresh || !expireOpts.ExpireTime.IsZero() {
145		return nil, errUnsupported
146	}
147
148	if isRoot(nodePath) {
149		return nil, v2error.NewError(v2error.EcodeRootROnly, nodePath, 0)
150	}
151
152	ecode := 0
153	applyf := func(stm concurrency.STM) error {
154		// build path if any directories in path do not exist
155		dirs := []string{}
156		for p := path.Dir(nodePath); !isRoot(p); p = path.Dir(p) {
157			pp := s.mkPath(p)
158			if stm.Rev(pp) > 0 {
159				ecode = v2error.EcodeNotDir
160				return nil
161			}
162			if stm.Rev(pp+"/") == 0 {
163				dirs = append(dirs, pp+"/")
164			}
165		}
166		for _, d := range dirs {
167			stm.Put(d, "")
168		}
169
170		key := s.mkPath(nodePath)
171		if dir {
172			if stm.Rev(key) != 0 {
173				// exists as non-dir
174				ecode = v2error.EcodeNotDir
175				return nil
176			}
177			key = key + "/"
178		} else if stm.Rev(key+"/") != 0 {
179			ecode = v2error.EcodeNotFile
180			return nil
181		}
182		stm.Put(key, value, clientv3.WithPrevKV())
183		stm.Put(s.mkActionKey(), v2store.Set)
184		return nil
185	}
186
187	resp, err := s.newSTM(applyf)
188	if err != nil {
189		return nil, err
190	}
191	if ecode != 0 {
192		return nil, v2error.NewError(ecode, nodePath, mkV2Rev(resp.Header.Revision))
193	}
194
195	createRev := resp.Header.Revision
196	var pn *v2store.NodeExtern
197	if pkv := prevKeyFromPuts(resp); pkv != nil {
198		pn = s.mkV2Node(pkv)
199		createRev = pkv.CreateRevision
200	}
201
202	vp := &value
203	if dir {
204		vp = nil
205	}
206	return &v2store.Event{
207		Action: v2store.Set,
208		Node: &v2store.NodeExtern{
209			Key:           nodePath,
210			Value:         vp,
211			Dir:           dir,
212			ModifiedIndex: mkV2Rev(resp.Header.Revision),
213			CreatedIndex:  mkV2Rev(createRev),
214		},
215		PrevNode:  pn,
216		EtcdIndex: mkV2Rev(resp.Header.Revision),
217	}, nil
218}
219
220func (s *v2v3Store) Update(nodePath, newValue string, expireOpts v2store.TTLOptionSet) (*v2store.Event, error) {
221	if isRoot(nodePath) {
222		return nil, v2error.NewError(v2error.EcodeRootROnly, nodePath, 0)
223	}
224
225	if expireOpts.Refresh || !expireOpts.ExpireTime.IsZero() {
226		return nil, errUnsupported
227	}
228
229	key := s.mkPath(nodePath)
230	ecode := 0
231	applyf := func(stm concurrency.STM) error {
232		if rev := stm.Rev(key + "/"); rev != 0 {
233			ecode = v2error.EcodeNotFile
234			return nil
235		}
236		if rev := stm.Rev(key); rev == 0 {
237			ecode = v2error.EcodeKeyNotFound
238			return nil
239		}
240		stm.Put(key, newValue, clientv3.WithPrevKV())
241		stm.Put(s.mkActionKey(), v2store.Update)
242		return nil
243	}
244
245	resp, err := s.newSTM(applyf)
246	if err != nil {
247		return nil, err
248	}
249	if ecode != 0 {
250		return nil, v2error.NewError(v2error.EcodeNotFile, nodePath, mkV2Rev(resp.Header.Revision))
251	}
252
253	pkv := prevKeyFromPuts(resp)
254	return &v2store.Event{
255		Action: v2store.Update,
256		Node: &v2store.NodeExtern{
257			Key:           nodePath,
258			Value:         &newValue,
259			ModifiedIndex: mkV2Rev(resp.Header.Revision),
260			CreatedIndex:  mkV2Rev(pkv.CreateRevision),
261		},
262		PrevNode:  s.mkV2Node(pkv),
263		EtcdIndex: mkV2Rev(resp.Header.Revision),
264	}, nil
265}
266
267func (s *v2v3Store) Create(
268	nodePath string,
269	dir bool,
270	value string,
271	unique bool,
272	expireOpts v2store.TTLOptionSet,
273) (*v2store.Event, error) {
274	if isRoot(nodePath) {
275		return nil, v2error.NewError(v2error.EcodeRootROnly, nodePath, 0)
276	}
277	if expireOpts.Refresh || !expireOpts.ExpireTime.IsZero() {
278		return nil, errUnsupported
279	}
280	ecode := 0
281	applyf := func(stm concurrency.STM) error {
282		ecode = 0
283		key := s.mkPath(nodePath)
284		if unique {
285			// append unique item under the node path
286			for {
287				key = nodePath + "/" + fmt.Sprintf("%020s", time.Now())
288				key = path.Clean(path.Join("/", key))
289				key = s.mkPath(key)
290				if stm.Rev(key) == 0 {
291					break
292				}
293			}
294		}
295		if stm.Rev(key) > 0 || stm.Rev(key+"/") > 0 {
296			ecode = v2error.EcodeNodeExist
297			return nil
298		}
299		// build path if any directories in path do not exist
300		dirs := []string{}
301		for p := path.Dir(nodePath); !isRoot(p); p = path.Dir(p) {
302			pp := s.mkPath(p)
303			if stm.Rev(pp) > 0 {
304				ecode = v2error.EcodeNotDir
305				return nil
306			}
307			if stm.Rev(pp+"/") == 0 {
308				dirs = append(dirs, pp+"/")
309			}
310		}
311		for _, d := range dirs {
312			stm.Put(d, "")
313		}
314
315		if dir {
316			// directories marked with extra slash in key name
317			key += "/"
318		}
319		stm.Put(key, value)
320		stm.Put(s.mkActionKey(), v2store.Create)
321		return nil
322	}
323
324	resp, err := s.newSTM(applyf)
325	if err != nil {
326		return nil, err
327	}
328	if ecode != 0 {
329		return nil, v2error.NewError(ecode, nodePath, mkV2Rev(resp.Header.Revision))
330	}
331
332	var v *string
333	if !dir {
334		v = &value
335	}
336
337	return &v2store.Event{
338		Action: v2store.Create,
339		Node: &v2store.NodeExtern{
340			Key:           nodePath,
341			Value:         v,
342			Dir:           dir,
343			ModifiedIndex: mkV2Rev(resp.Header.Revision),
344			CreatedIndex:  mkV2Rev(resp.Header.Revision),
345		},
346		EtcdIndex: mkV2Rev(resp.Header.Revision),
347	}, nil
348}
349
350func (s *v2v3Store) CompareAndSwap(
351	nodePath string,
352	prevValue string,
353	prevIndex uint64,
354	value string,
355	expireOpts v2store.TTLOptionSet,
356) (*v2store.Event, error) {
357	if isRoot(nodePath) {
358		return nil, v2error.NewError(v2error.EcodeRootROnly, nodePath, 0)
359	}
360	if expireOpts.Refresh || !expireOpts.ExpireTime.IsZero() {
361		return nil, errUnsupported
362	}
363
364	key := s.mkPath(nodePath)
365	resp, err := s.c.Txn(s.ctx).If(
366		s.mkCompare(nodePath, prevValue, prevIndex)...,
367	).Then(
368		clientv3.OpPut(key, value, clientv3.WithPrevKV()),
369		clientv3.OpPut(s.mkActionKey(), v2store.CompareAndSwap),
370	).Else(
371		clientv3.OpGet(key),
372		clientv3.OpGet(key+"/"),
373	).Commit()
374
375	if err != nil {
376		return nil, err
377	}
378	if !resp.Succeeded {
379		return nil, compareFail(nodePath, prevValue, prevIndex, resp)
380	}
381
382	pkv := resp.Responses[0].GetResponsePut().PrevKv
383	return &v2store.Event{
384		Action: v2store.CompareAndSwap,
385		Node: &v2store.NodeExtern{
386			Key:           nodePath,
387			Value:         &value,
388			CreatedIndex:  mkV2Rev(pkv.CreateRevision),
389			ModifiedIndex: mkV2Rev(resp.Header.Revision),
390		},
391		PrevNode:  s.mkV2Node(pkv),
392		EtcdIndex: mkV2Rev(resp.Header.Revision),
393	}, nil
394}
395
396func (s *v2v3Store) Delete(nodePath string, dir, recursive bool) (*v2store.Event, error) {
397	if isRoot(nodePath) {
398		return nil, v2error.NewError(v2error.EcodeRootROnly, nodePath, 0)
399	}
400	if !dir && !recursive {
401		return s.deleteNode(nodePath)
402	}
403	if !recursive {
404		return s.deleteEmptyDir(nodePath)
405	}
406
407	dels := make([]clientv3.Op, maxPathDepth+1)
408	dels[0] = clientv3.OpDelete(s.mkPath(nodePath)+"/", clientv3.WithPrevKV())
409	for i := 1; i < maxPathDepth; i++ {
410		dels[i] = clientv3.OpDelete(s.mkPathDepth(nodePath, i), clientv3.WithPrefix())
411	}
412	dels[maxPathDepth] = clientv3.OpPut(s.mkActionKey(), v2store.Delete)
413
414	resp, err := s.c.Txn(s.ctx).If(
415		clientv3.Compare(clientv3.Version(s.mkPath(nodePath)+"/"), ">", 0),
416		clientv3.Compare(clientv3.Version(s.mkPathDepth(nodePath, maxPathDepth)+"/"), "=", 0),
417	).Then(
418		dels...,
419	).Commit()
420	if err != nil {
421		return nil, err
422	}
423	if !resp.Succeeded {
424		return nil, v2error.NewError(v2error.EcodeNodeExist, nodePath, mkV2Rev(resp.Header.Revision))
425	}
426	dresp := resp.Responses[0].GetResponseDeleteRange()
427	return &v2store.Event{
428		Action:    v2store.Delete,
429		PrevNode:  s.mkV2Node(dresp.PrevKvs[0]),
430		EtcdIndex: mkV2Rev(resp.Header.Revision),
431	}, nil
432}
433
434func (s *v2v3Store) deleteEmptyDir(nodePath string) (*v2store.Event, error) {
435	resp, err := s.c.Txn(s.ctx).If(
436		clientv3.Compare(clientv3.Version(s.mkPathDepth(nodePath, 1)), "=", 0).WithPrefix(),
437	).Then(
438		clientv3.OpDelete(s.mkPath(nodePath)+"/", clientv3.WithPrevKV()),
439		clientv3.OpPut(s.mkActionKey(), v2store.Delete),
440	).Commit()
441	if err != nil {
442		return nil, err
443	}
444	if !resp.Succeeded {
445		return nil, v2error.NewError(v2error.EcodeDirNotEmpty, nodePath, mkV2Rev(resp.Header.Revision))
446	}
447	dresp := resp.Responses[0].GetResponseDeleteRange()
448	if len(dresp.PrevKvs) == 0 {
449		return nil, v2error.NewError(v2error.EcodeNodeExist, nodePath, mkV2Rev(resp.Header.Revision))
450	}
451	return &v2store.Event{
452		Action:    v2store.Delete,
453		PrevNode:  s.mkV2Node(dresp.PrevKvs[0]),
454		EtcdIndex: mkV2Rev(resp.Header.Revision),
455	}, nil
456}
457
458func (s *v2v3Store) deleteNode(nodePath string) (*v2store.Event, error) {
459	resp, err := s.c.Txn(s.ctx).If(
460		clientv3.Compare(clientv3.Version(s.mkPath(nodePath)+"/"), "=", 0),
461	).Then(
462		clientv3.OpDelete(s.mkPath(nodePath), clientv3.WithPrevKV()),
463		clientv3.OpPut(s.mkActionKey(), v2store.Delete),
464	).Commit()
465	if err != nil {
466		return nil, err
467	}
468	if !resp.Succeeded {
469		return nil, v2error.NewError(v2error.EcodeNotFile, nodePath, mkV2Rev(resp.Header.Revision))
470	}
471	pkvs := resp.Responses[0].GetResponseDeleteRange().PrevKvs
472	if len(pkvs) == 0 {
473		return nil, v2error.NewError(v2error.EcodeKeyNotFound, nodePath, mkV2Rev(resp.Header.Revision))
474	}
475	pkv := pkvs[0]
476	return &v2store.Event{
477		Action: v2store.Delete,
478		Node: &v2store.NodeExtern{
479			Key:           nodePath,
480			CreatedIndex:  mkV2Rev(pkv.CreateRevision),
481			ModifiedIndex: mkV2Rev(resp.Header.Revision),
482		},
483		PrevNode:  s.mkV2Node(pkv),
484		EtcdIndex: mkV2Rev(resp.Header.Revision),
485	}, nil
486}
487
488func (s *v2v3Store) CompareAndDelete(nodePath, prevValue string, prevIndex uint64) (*v2store.Event, error) {
489	if isRoot(nodePath) {
490		return nil, v2error.NewError(v2error.EcodeRootROnly, nodePath, 0)
491	}
492
493	key := s.mkPath(nodePath)
494	resp, err := s.c.Txn(s.ctx).If(
495		s.mkCompare(nodePath, prevValue, prevIndex)...,
496	).Then(
497		clientv3.OpDelete(key, clientv3.WithPrevKV()),
498		clientv3.OpPut(s.mkActionKey(), v2store.CompareAndDelete),
499	).Else(
500		clientv3.OpGet(key),
501		clientv3.OpGet(key+"/"),
502	).Commit()
503
504	if err != nil {
505		return nil, err
506	}
507	if !resp.Succeeded {
508		return nil, compareFail(nodePath, prevValue, prevIndex, resp)
509	}
510
511	// len(pkvs) > 1 since txn only succeeds when key exists
512	pkv := resp.Responses[0].GetResponseDeleteRange().PrevKvs[0]
513	return &v2store.Event{
514		Action: v2store.CompareAndDelete,
515		Node: &v2store.NodeExtern{
516			Key:           nodePath,
517			CreatedIndex:  mkV2Rev(pkv.CreateRevision),
518			ModifiedIndex: mkV2Rev(resp.Header.Revision),
519		},
520		PrevNode:  s.mkV2Node(pkv),
521		EtcdIndex: mkV2Rev(resp.Header.Revision),
522	}, nil
523}
524
525func compareFail(nodePath, prevValue string, prevIndex uint64, resp *clientv3.TxnResponse) error {
526	if dkvs := resp.Responses[1].GetResponseRange().Kvs; len(dkvs) > 0 {
527		return v2error.NewError(v2error.EcodeNotFile, nodePath, mkV2Rev(resp.Header.Revision))
528	}
529	kvs := resp.Responses[0].GetResponseRange().Kvs
530	if len(kvs) == 0 {
531		return v2error.NewError(v2error.EcodeKeyNotFound, nodePath, mkV2Rev(resp.Header.Revision))
532	}
533	kv := kvs[0]
534	indexMatch := prevIndex == 0 || kv.ModRevision == int64(prevIndex)
535	valueMatch := prevValue == "" || string(kv.Value) == prevValue
536	var cause string
537	switch {
538	case indexMatch && !valueMatch:
539		cause = fmt.Sprintf("[%v != %v]", prevValue, string(kv.Value))
540	case valueMatch && !indexMatch:
541		cause = fmt.Sprintf("[%v != %v]", prevIndex, kv.ModRevision)
542	default:
543		cause = fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, string(kv.Value), prevIndex, kv.ModRevision)
544	}
545	return v2error.NewError(v2error.EcodeTestFailed, cause, mkV2Rev(resp.Header.Revision))
546}
547
548func (s *v2v3Store) mkCompare(nodePath, prevValue string, prevIndex uint64) []clientv3.Cmp {
549	key := s.mkPath(nodePath)
550	cmps := []clientv3.Cmp{clientv3.Compare(clientv3.Version(key), ">", 0)}
551	if prevIndex != 0 {
552		cmps = append(cmps, clientv3.Compare(clientv3.ModRevision(key), "=", mkV3Rev(prevIndex)))
553	}
554	if prevValue != "" {
555		cmps = append(cmps, clientv3.Compare(clientv3.Value(key), "=", prevValue))
556	}
557	return cmps
558}
559
560func (s *v2v3Store) JsonStats() []byte                  { panic("STUB") }
561func (s *v2v3Store) DeleteExpiredKeys(cutoff time.Time) { panic("STUB") }
562
563func (s *v2v3Store) Version() int { return 2 }
564
565// TODO: move this out of the Store interface?
566
567func (s *v2v3Store) Save() ([]byte, error)       { panic("STUB") }
568func (s *v2v3Store) Recovery(state []byte) error { panic("STUB") }
569func (s *v2v3Store) Clone() v2store.Store        { panic("STUB") }
570func (s *v2v3Store) SaveNoCopy() ([]byte, error) { panic("STUB") }
571func (s *v2v3Store) HasTTLKeys() bool            { panic("STUB") }
572
573func (s *v2v3Store) mkPath(nodePath string) string { return s.mkPathDepth(nodePath, 0) }
574
575func (s *v2v3Store) mkNodePath(p string) string {
576	return path.Clean(p[len(s.pfx)+len("/k/000/"):])
577}
578
579// mkPathDepth makes a path to a key that encodes its directory depth
580// for fast directory listing. If a depth is provided, it is added
581// to the computed depth.
582func (s *v2v3Store) mkPathDepth(nodePath string, depth int) string {
583	normalForm := path.Clean(path.Join("/", nodePath))
584	n := strings.Count(normalForm, "/") + depth
585	return fmt.Sprintf("%s/%03d/k/%s", s.pfx, n, normalForm)
586}
587
588func (s *v2v3Store) mkActionKey() string { return s.pfx + "/act" }
589
590func isRoot(s string) bool { return len(s) == 0 || s == "/" || s == "/0" || s == "/1" }
591
592func mkV2Rev(v3Rev int64) uint64 {
593	if v3Rev == 0 {
594		return 0
595	}
596	return uint64(v3Rev - 1)
597}
598
599func mkV3Rev(v2Rev uint64) int64 {
600	if v2Rev == 0 {
601		return 0
602	}
603	return int64(v2Rev + 1)
604}
605
606// mkV2Node creates a V2 NodeExtern from a V3 KeyValue
607func (s *v2v3Store) mkV2Node(kv *mvccpb.KeyValue) *v2store.NodeExtern {
608	if kv == nil {
609		return nil
610	}
611	n := &v2store.NodeExtern{
612		Key:           s.mkNodePath(string(kv.Key)),
613		Dir:           kv.Key[len(kv.Key)-1] == '/',
614		CreatedIndex:  mkV2Rev(kv.CreateRevision),
615		ModifiedIndex: mkV2Rev(kv.ModRevision),
616	}
617	if !n.Dir {
618		v := string(kv.Value)
619		n.Value = &v
620	}
621	return n
622}
623
624// prevKeyFromPuts gets the prev key that is being put; ignores
625// the put action response.
626func prevKeyFromPuts(resp *clientv3.TxnResponse) *mvccpb.KeyValue {
627	for _, r := range resp.Responses {
628		pkv := r.GetResponsePut().PrevKv
629		if pkv != nil && pkv.CreateRevision > 0 {
630			return pkv
631		}
632	}
633	return nil
634}
635
636func (s *v2v3Store) newSTM(applyf func(concurrency.STM) error) (*clientv3.TxnResponse, error) {
637	return concurrency.NewSTM(s.c, applyf, concurrency.WithIsolation(concurrency.Serializable))
638}
639