1// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package recipe
16
17import (
18	"fmt"
19	"strings"
20	"time"
21
22	v3 "github.com/coreos/etcd/clientv3"
23	"github.com/coreos/etcd/clientv3/concurrency"
24	"golang.org/x/net/context"
25)
26
27// RemoteKV is a key/revision pair created by the client and stored on etcd
28type RemoteKV struct {
29	kv  v3.KV
30	key string
31	rev int64
32	val string
33}
34
35func newKey(kv v3.KV, key string, leaseID v3.LeaseID) (*RemoteKV, error) {
36	return newKV(kv, key, "", leaseID)
37}
38
39func newKV(kv v3.KV, key, val string, leaseID v3.LeaseID) (*RemoteKV, error) {
40	rev, err := putNewKV(kv, key, val, leaseID)
41	if err != nil {
42		return nil, err
43	}
44	return &RemoteKV{kv, key, rev, val}, nil
45}
46
47func newUniqueKV(kv v3.KV, prefix string, val string) (*RemoteKV, error) {
48	for {
49		newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano())
50		rev, err := putNewKV(kv, newKey, val, 0)
51		if err == nil {
52			return &RemoteKV{kv, newKey, rev, val}, nil
53		}
54		if err != ErrKeyExists {
55			return nil, err
56		}
57	}
58}
59
60// putNewKV attempts to create the given key, only succeeding if the key did
61// not yet exist.
62func putNewKV(kv v3.KV, key, val string, leaseID v3.LeaseID) (int64, error) {
63	cmp := v3.Compare(v3.Version(key), "=", 0)
64	req := v3.OpPut(key, val, v3.WithLease(leaseID))
65	txnresp, err := kv.Txn(context.TODO()).If(cmp).Then(req).Commit()
66	if err != nil {
67		return 0, err
68	}
69	if !txnresp.Succeeded {
70		return 0, ErrKeyExists
71	}
72	return txnresp.Header.Revision, nil
73}
74
75// newSequentialKV allocates a new sequential key <prefix>/nnnnn with a given
76// value and lease.  Note: a bookkeeping node __<prefix> is also allocated.
77func newSequentialKV(kv v3.KV, prefix, val string) (*RemoteKV, error) {
78	resp, err := kv.Get(context.TODO(), prefix, v3.WithLastKey()...)
79	if err != nil {
80		return nil, err
81	}
82
83	// add 1 to last key, if any
84	newSeqNum := 0
85	if len(resp.Kvs) != 0 {
86		fields := strings.Split(string(resp.Kvs[0].Key), "/")
87		_, serr := fmt.Sscanf(fields[len(fields)-1], "%d", &newSeqNum)
88		if serr != nil {
89			return nil, serr
90		}
91		newSeqNum++
92	}
93	newKey := fmt.Sprintf("%s/%016d", prefix, newSeqNum)
94
95	// base prefix key must be current (i.e., <=) with the server update;
96	// the base key is important to avoid the following:
97	// N1: LastKey() == 1, start txn.
98	// N2: new Key 2, new Key 3, Delete Key 2
99	// N1: txn succeeds allocating key 2 when it shouldn't
100	baseKey := "__" + prefix
101
102	// current revision might contain modification so +1
103	cmp := v3.Compare(v3.ModRevision(baseKey), "<", resp.Header.Revision+1)
104	reqPrefix := v3.OpPut(baseKey, "")
105	reqnewKey := v3.OpPut(newKey, val)
106
107	txn := kv.Txn(context.TODO())
108	txnresp, err := txn.If(cmp).Then(reqPrefix, reqnewKey).Commit()
109	if err != nil {
110		return nil, err
111	}
112	if !txnresp.Succeeded {
113		return newSequentialKV(kv, prefix, val)
114	}
115	return &RemoteKV{kv, newKey, txnresp.Header.Revision, val}, nil
116}
117
118func (rk *RemoteKV) Key() string     { return rk.key }
119func (rk *RemoteKV) Revision() int64 { return rk.rev }
120func (rk *RemoteKV) Value() string   { return rk.val }
121
122func (rk *RemoteKV) Delete() error {
123	if rk.kv == nil {
124		return nil
125	}
126	_, err := rk.kv.Delete(context.TODO(), rk.key)
127	rk.kv = nil
128	return err
129}
130
131func (rk *RemoteKV) Put(val string) error {
132	_, err := rk.kv.Put(context.TODO(), rk.key, val)
133	return err
134}
135
136// EphemeralKV is a new key associated with a session lease
137type EphemeralKV struct{ RemoteKV }
138
139// newEphemeralKV creates a new key/value pair associated with a session lease
140func newEphemeralKV(s *concurrency.Session, key, val string) (*EphemeralKV, error) {
141	k, err := newKV(s.Client(), key, val, s.Lease())
142	if err != nil {
143		return nil, err
144	}
145	return &EphemeralKV{*k}, nil
146}
147
148// newUniqueEphemeralKey creates a new unique valueless key associated with a session lease
149func newUniqueEphemeralKey(s *concurrency.Session, prefix string) (*EphemeralKV, error) {
150	return newUniqueEphemeralKV(s, prefix, "")
151}
152
153// newUniqueEphemeralKV creates a new unique key/value pair associated with a session lease
154func newUniqueEphemeralKV(s *concurrency.Session, prefix, val string) (ek *EphemeralKV, err error) {
155	for {
156		newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano())
157		ek, err = newEphemeralKV(s, newKey, val)
158		if err == nil || err != ErrKeyExists {
159			break
160		}
161	}
162	return ek, err
163}
164