1// Copyright 2016 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package membership 16 17import ( 18 "encoding/json" 19 "fmt" 20 "path" 21 22 "go.etcd.io/etcd/pkg/v3/types" 23 "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" 24 "go.etcd.io/etcd/server/v3/mvcc/backend" 25 26 "github.com/coreos/go-semver/semver" 27 "go.uber.org/zap" 28) 29 30const ( 31 attributesSuffix = "attributes" 32 raftAttributesSuffix = "raftAttributes" 33 34 // the prefix for stroing membership related information in store provided by store pkg. 35 storePrefix = "/0" 36) 37 38var ( 39 membersBucketName = []byte("members") 40 membersRemovedBucketName = []byte("members_removed") 41 clusterBucketName = []byte("cluster") 42 43 StoreMembersPrefix = path.Join(storePrefix, "members") 44 storeRemovedMembersPrefix = path.Join(storePrefix, "removed_members") 45) 46 47func mustSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) { 48 mkey := backendMemberKey(m.ID) 49 mvalue, err := json.Marshal(m) 50 if err != nil { 51 lg.Panic("failed to marshal member", zap.Error(err)) 52 } 53 54 tx := be.BatchTx() 55 tx.Lock() 56 defer tx.Unlock() 57 tx.UnsafePut(membersBucketName, mkey, mvalue) 58} 59 60func mustDeleteMemberFromBackend(be backend.Backend, id types.ID) { 61 mkey := backendMemberKey(id) 62 63 tx := be.BatchTx() 64 tx.Lock() 65 defer tx.Unlock() 66 tx.UnsafeDelete(membersBucketName, mkey) 67 tx.UnsafePut(membersRemovedBucketName, mkey, []byte("removed")) 68} 69 70func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) { 71 ckey := backendClusterVersionKey() 72 73 tx := be.BatchTx() 74 tx.Lock() 75 defer tx.Unlock() 76 tx.UnsafePut(clusterBucketName, ckey, []byte(ver.String())) 77} 78 79func mustSaveDowngradeToBackend(lg *zap.Logger, be backend.Backend, downgrade *DowngradeInfo) { 80 dkey := backendDowngradeKey() 81 dvalue, err := json.Marshal(downgrade) 82 if err != nil { 83 lg.Panic("failed to marshal downgrade information", zap.Error(err)) 84 } 85 tx := be.BatchTx() 86 tx.Lock() 87 defer tx.Unlock() 88 tx.UnsafePut(clusterBucketName, dkey, dvalue) 89} 90 91func mustSaveMemberToStore(lg *zap.Logger, s v2store.Store, m *Member) { 92 b, err := json.Marshal(m.RaftAttributes) 93 if err != nil { 94 lg.Panic("failed to marshal raftAttributes", zap.Error(err)) 95 } 96 p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix) 97 if _, err := s.Create(p, false, string(b), false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil { 98 lg.Panic( 99 "failed to save member to store", 100 zap.String("path", p), 101 zap.Error(err), 102 ) 103 } 104} 105 106func mustDeleteMemberFromStore(lg *zap.Logger, s v2store.Store, id types.ID) { 107 if _, err := s.Delete(MemberStoreKey(id), true, true); err != nil { 108 lg.Panic( 109 "failed to delete member from store", 110 zap.String("path", MemberStoreKey(id)), 111 zap.Error(err), 112 ) 113 } 114 if _, err := s.Create(RemovedMemberStoreKey(id), false, "", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil { 115 lg.Panic( 116 "failed to create removedMember", 117 zap.String("path", RemovedMemberStoreKey(id)), 118 zap.Error(err), 119 ) 120 } 121} 122 123func mustUpdateMemberInStore(lg *zap.Logger, s v2store.Store, m *Member) { 124 b, err := json.Marshal(m.RaftAttributes) 125 if err != nil { 126 lg.Panic("failed to marshal raftAttributes", zap.Error(err)) 127 } 128 p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix) 129 if _, err := s.Update(p, string(b), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil { 130 lg.Panic( 131 "failed to update raftAttributes", 132 zap.String("path", p), 133 zap.Error(err), 134 ) 135 } 136} 137 138func mustUpdateMemberAttrInStore(lg *zap.Logger, s v2store.Store, m *Member) { 139 b, err := json.Marshal(m.Attributes) 140 if err != nil { 141 lg.Panic("failed to marshal attributes", zap.Error(err)) 142 } 143 p := path.Join(MemberStoreKey(m.ID), attributesSuffix) 144 if _, err := s.Set(p, false, string(b), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil { 145 lg.Panic( 146 "failed to update attributes", 147 zap.String("path", p), 148 zap.Error(err), 149 ) 150 } 151} 152 153func mustSaveClusterVersionToStore(lg *zap.Logger, s v2store.Store, ver *semver.Version) { 154 if _, err := s.Set(StoreClusterVersionKey(), false, ver.String(), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil { 155 lg.Panic( 156 "failed to save cluster version to store", 157 zap.String("path", StoreClusterVersionKey()), 158 zap.Error(err), 159 ) 160 } 161} 162 163// nodeToMember builds member from a key value node. 164// the child nodes of the given node MUST be sorted by key. 165func nodeToMember(lg *zap.Logger, n *v2store.NodeExtern) (*Member, error) { 166 m := &Member{ID: MustParseMemberIDFromKey(lg, n.Key)} 167 attrs := make(map[string][]byte) 168 raftAttrKey := path.Join(n.Key, raftAttributesSuffix) 169 attrKey := path.Join(n.Key, attributesSuffix) 170 for _, nn := range n.Nodes { 171 if nn.Key != raftAttrKey && nn.Key != attrKey { 172 return nil, fmt.Errorf("unknown key %q", nn.Key) 173 } 174 attrs[nn.Key] = []byte(*nn.Value) 175 } 176 if data := attrs[raftAttrKey]; data != nil { 177 if err := json.Unmarshal(data, &m.RaftAttributes); err != nil { 178 return nil, fmt.Errorf("unmarshal raftAttributes error: %v", err) 179 } 180 } else { 181 return nil, fmt.Errorf("raftAttributes key doesn't exist") 182 } 183 if data := attrs[attrKey]; data != nil { 184 if err := json.Unmarshal(data, &m.Attributes); err != nil { 185 return m, fmt.Errorf("unmarshal attributes error: %v", err) 186 } 187 } 188 return m, nil 189} 190 191func backendMemberKey(id types.ID) []byte { 192 return []byte(id.String()) 193} 194 195func backendClusterVersionKey() []byte { 196 return []byte("clusterVersion") 197} 198 199func backendDowngradeKey() []byte { 200 return []byte("downgrade") 201} 202 203func mustCreateBackendBuckets(be backend.Backend) { 204 tx := be.BatchTx() 205 tx.Lock() 206 defer tx.Unlock() 207 tx.UnsafeCreateBucket(membersBucketName) 208 tx.UnsafeCreateBucket(membersRemovedBucketName) 209 tx.UnsafeCreateBucket(clusterBucketName) 210} 211 212func MemberStoreKey(id types.ID) string { 213 return path.Join(StoreMembersPrefix, id.String()) 214} 215 216func StoreClusterVersionKey() string { 217 return path.Join(storePrefix, "version") 218} 219 220func MemberAttributesStorePath(id types.ID) string { 221 return path.Join(MemberStoreKey(id), attributesSuffix) 222} 223 224func MustParseMemberIDFromKey(lg *zap.Logger, key string) types.ID { 225 id, err := types.IDFromString(path.Base(key)) 226 if err != nil { 227 lg.Panic("failed to parse memver id from key", zap.Error(err)) 228 } 229 return id 230} 231 232func RemovedMemberStoreKey(id types.ID) string { 233 return path.Join(storeRemovedMembersPrefix, id.String()) 234} 235