1// Copyright 2016 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package recipe 16 17import ( 18 "fmt" 19 "strings" 20 "time" 21 22 v3 "github.com/coreos/etcd/clientv3" 23 "github.com/coreos/etcd/clientv3/concurrency" 24 "golang.org/x/net/context" 25) 26 27// RemoteKV is a key/revision pair created by the client and stored on etcd 28type RemoteKV struct { 29 kv v3.KV 30 key string 31 rev int64 32 val string 33} 34 35func newKey(kv v3.KV, key string, leaseID v3.LeaseID) (*RemoteKV, error) { 36 return newKV(kv, key, "", leaseID) 37} 38 39func newKV(kv v3.KV, key, val string, leaseID v3.LeaseID) (*RemoteKV, error) { 40 rev, err := putNewKV(kv, key, val, leaseID) 41 if err != nil { 42 return nil, err 43 } 44 return &RemoteKV{kv, key, rev, val}, nil 45} 46 47func newUniqueKV(kv v3.KV, prefix string, val string) (*RemoteKV, error) { 48 for { 49 newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano()) 50 rev, err := putNewKV(kv, newKey, val, 0) 51 if err == nil { 52 return &RemoteKV{kv, newKey, rev, val}, nil 53 } 54 if err != ErrKeyExists { 55 return nil, err 56 } 57 } 58} 59 60// putNewKV attempts to create the given key, only succeeding if the key did 61// not yet exist. 62func putNewKV(kv v3.KV, key, val string, leaseID v3.LeaseID) (int64, error) { 63 cmp := v3.Compare(v3.Version(key), "=", 0) 64 req := v3.OpPut(key, val, v3.WithLease(leaseID)) 65 txnresp, err := kv.Txn(context.TODO()).If(cmp).Then(req).Commit() 66 if err != nil { 67 return 0, err 68 } 69 if !txnresp.Succeeded { 70 return 0, ErrKeyExists 71 } 72 return txnresp.Header.Revision, nil 73} 74 75// newSequentialKV allocates a new sequential key <prefix>/nnnnn with a given 76// value and lease. Note: a bookkeeping node __<prefix> is also allocated. 77func newSequentialKV(kv v3.KV, prefix, val string) (*RemoteKV, error) { 78 resp, err := kv.Get(context.TODO(), prefix, v3.WithLastKey()...) 79 if err != nil { 80 return nil, err 81 } 82 83 // add 1 to last key, if any 84 newSeqNum := 0 85 if len(resp.Kvs) != 0 { 86 fields := strings.Split(string(resp.Kvs[0].Key), "/") 87 _, serr := fmt.Sscanf(fields[len(fields)-1], "%d", &newSeqNum) 88 if serr != nil { 89 return nil, serr 90 } 91 newSeqNum++ 92 } 93 newKey := fmt.Sprintf("%s/%016d", prefix, newSeqNum) 94 95 // base prefix key must be current (i.e., <=) with the server update; 96 // the base key is important to avoid the following: 97 // N1: LastKey() == 1, start txn. 98 // N2: new Key 2, new Key 3, Delete Key 2 99 // N1: txn succeeds allocating key 2 when it shouldn't 100 baseKey := "__" + prefix 101 102 // current revision might contain modification so +1 103 cmp := v3.Compare(v3.ModRevision(baseKey), "<", resp.Header.Revision+1) 104 reqPrefix := v3.OpPut(baseKey, "") 105 reqnewKey := v3.OpPut(newKey, val) 106 107 txn := kv.Txn(context.TODO()) 108 txnresp, err := txn.If(cmp).Then(reqPrefix, reqnewKey).Commit() 109 if err != nil { 110 return nil, err 111 } 112 if !txnresp.Succeeded { 113 return newSequentialKV(kv, prefix, val) 114 } 115 return &RemoteKV{kv, newKey, txnresp.Header.Revision, val}, nil 116} 117 118func (rk *RemoteKV) Key() string { return rk.key } 119func (rk *RemoteKV) Revision() int64 { return rk.rev } 120func (rk *RemoteKV) Value() string { return rk.val } 121 122func (rk *RemoteKV) Delete() error { 123 if rk.kv == nil { 124 return nil 125 } 126 _, err := rk.kv.Delete(context.TODO(), rk.key) 127 rk.kv = nil 128 return err 129} 130 131func (rk *RemoteKV) Put(val string) error { 132 _, err := rk.kv.Put(context.TODO(), rk.key, val) 133 return err 134} 135 136// EphemeralKV is a new key associated with a session lease 137type EphemeralKV struct{ RemoteKV } 138 139// newEphemeralKV creates a new key/value pair associated with a session lease 140func newEphemeralKV(s *concurrency.Session, key, val string) (*EphemeralKV, error) { 141 k, err := newKV(s.Client(), key, val, s.Lease()) 142 if err != nil { 143 return nil, err 144 } 145 return &EphemeralKV{*k}, nil 146} 147 148// newUniqueEphemeralKey creates a new unique valueless key associated with a session lease 149func newUniqueEphemeralKey(s *concurrency.Session, prefix string) (*EphemeralKV, error) { 150 return newUniqueEphemeralKV(s, prefix, "") 151} 152 153// newUniqueEphemeralKV creates a new unique key/value pair associated with a session lease 154func newUniqueEphemeralKV(s *concurrency.Session, prefix, val string) (ek *EphemeralKV, err error) { 155 for { 156 newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano()) 157 ek, err = newEphemeralKV(s, newKey, val) 158 if err == nil || err != ErrKeyExists { 159 break 160 } 161 } 162 return ek, err 163} 164