1// Copyright 2016 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package membership 16 17import ( 18 "encoding/json" 19 "fmt" 20 "path" 21 22 "go.etcd.io/etcd/client/pkg/v3/types" 23 "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" 24 "go.etcd.io/etcd/server/v3/mvcc/backend" 25 "go.etcd.io/etcd/server/v3/mvcc/buckets" 26 27 "github.com/coreos/go-semver/semver" 28 "go.uber.org/zap" 29) 30 31const ( 32 attributesSuffix = "attributes" 33 raftAttributesSuffix = "raftAttributes" 34 35 // the prefix for storing membership related information in store provided by store pkg. 36 storePrefix = "/0" 37) 38 39var ( 40 StoreMembersPrefix = path.Join(storePrefix, "members") 41 storeRemovedMembersPrefix = path.Join(storePrefix, "removed_members") 42) 43 44func mustSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) { 45 mkey := backendMemberKey(m.ID) 46 mvalue, err := json.Marshal(m) 47 if err != nil { 48 lg.Panic("failed to marshal member", zap.Error(err)) 49 } 50 51 tx := be.BatchTx() 52 tx.Lock() 53 defer tx.Unlock() 54 tx.UnsafePut(buckets.Members, mkey, mvalue) 55} 56 57// TrimClusterFromBackend removes all information about cluster (versions) 58// from the v3 backend. 59func TrimClusterFromBackend(be backend.Backend) error { 60 tx := be.BatchTx() 61 tx.Lock() 62 defer tx.Unlock() 63 tx.UnsafeDeleteBucket(buckets.Cluster) 64 return nil 65} 66 67func mustDeleteMemberFromBackend(be backend.Backend, id types.ID) { 68 mkey := backendMemberKey(id) 69 70 tx := be.BatchTx() 71 tx.Lock() 72 defer tx.Unlock() 73 tx.UnsafeDelete(buckets.Members, mkey) 74 tx.UnsafePut(buckets.MembersRemoved, mkey, []byte("removed")) 75} 76 77func readMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*Member, map[types.ID]bool, error) { 78 members := make(map[types.ID]*Member) 79 removed := make(map[types.ID]bool) 80 81 tx := be.ReadTx() 82 tx.RLock() 83 defer tx.RUnlock() 84 err := tx.UnsafeForEach(buckets.Members, func(k, v []byte) error { 85 memberId := mustParseMemberIDFromBytes(lg, k) 86 m := &Member{ID: memberId} 87 if err := json.Unmarshal(v, &m); err != nil { 88 return err 89 } 90 members[memberId] = m 91 return nil 92 }) 93 if err != nil { 94 return nil, nil, fmt.Errorf("couldn't read members from backend: %w", err) 95 } 96 97 err = tx.UnsafeForEach(buckets.MembersRemoved, func(k, v []byte) error { 98 memberId := mustParseMemberIDFromBytes(lg, k) 99 removed[memberId] = true 100 return nil 101 }) 102 if err != nil { 103 return nil, nil, fmt.Errorf("couldn't read members_removed from backend: %w", err) 104 } 105 return members, removed, nil 106} 107 108func mustReadMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*Member, map[types.ID]bool) { 109 members, removed, err := readMembersFromBackend(lg, be) 110 if err != nil { 111 lg.Panic("couldn't read members from backend", zap.Error(err)) 112 } 113 return members, removed 114} 115 116// TrimMembershipFromBackend removes all information about members & 117// removed_members from the v3 backend. 118func TrimMembershipFromBackend(lg *zap.Logger, be backend.Backend) error { 119 lg.Info("Trimming membership information from the backend...") 120 tx := be.BatchTx() 121 tx.Lock() 122 defer tx.Unlock() 123 err := tx.UnsafeForEach(buckets.Members, func(k, v []byte) error { 124 tx.UnsafeDelete(buckets.Members, k) 125 lg.Debug("Removed member from the backend", 126 zap.Stringer("member", mustParseMemberIDFromBytes(lg, k))) 127 return nil 128 }) 129 if err != nil { 130 return err 131 } 132 return tx.UnsafeForEach(buckets.MembersRemoved, func(k, v []byte) error { 133 tx.UnsafeDelete(buckets.MembersRemoved, k) 134 lg.Debug("Removed removed_member from the backend", 135 zap.Stringer("member", mustParseMemberIDFromBytes(lg, k))) 136 return nil 137 }) 138} 139 140// TrimMembershipFromV2Store removes all information about members & 141// removed_members from the v2 store. 142func TrimMembershipFromV2Store(lg *zap.Logger, s v2store.Store) error { 143 members, removed := membersFromStore(lg, s) 144 145 for mID := range members { 146 _, err := s.Delete(MemberStoreKey(mID), true, true) 147 if err != nil { 148 return err 149 } 150 } 151 for mID := range removed { 152 _, err := s.Delete(RemovedMemberStoreKey(mID), true, true) 153 if err != nil { 154 return err 155 } 156 } 157 158 return nil 159} 160 161// The field is populated since etcd v3.5. 162func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) { 163 ckey := backendClusterVersionKey() 164 165 tx := be.BatchTx() 166 tx.Lock() 167 defer tx.Unlock() 168 tx.UnsafePut(buckets.Cluster, ckey, []byte(ver.String())) 169} 170 171// The field is populated since etcd v3.5. 172func mustSaveDowngradeToBackend(lg *zap.Logger, be backend.Backend, downgrade *DowngradeInfo) { 173 dkey := backendDowngradeKey() 174 dvalue, err := json.Marshal(downgrade) 175 if err != nil { 176 lg.Panic("failed to marshal downgrade information", zap.Error(err)) 177 } 178 tx := be.BatchTx() 179 tx.Lock() 180 defer tx.Unlock() 181 tx.UnsafePut(buckets.Cluster, dkey, dvalue) 182} 183 184func mustSaveMemberToStore(lg *zap.Logger, s v2store.Store, m *Member) { 185 b, err := json.Marshal(m.RaftAttributes) 186 if err != nil { 187 lg.Panic("failed to marshal raftAttributes", zap.Error(err)) 188 } 189 p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix) 190 if _, err := s.Create(p, false, string(b), false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil { 191 lg.Panic( 192 "failed to save member to store", 193 zap.String("path", p), 194 zap.Error(err), 195 ) 196 } 197} 198 199func mustDeleteMemberFromStore(lg *zap.Logger, s v2store.Store, id types.ID) { 200 if _, err := s.Delete(MemberStoreKey(id), true, true); err != nil { 201 lg.Panic( 202 "failed to delete member from store", 203 zap.String("path", MemberStoreKey(id)), 204 zap.Error(err), 205 ) 206 } 207 if _, err := s.Create(RemovedMemberStoreKey(id), false, "", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil { 208 lg.Panic( 209 "failed to create removedMember", 210 zap.String("path", RemovedMemberStoreKey(id)), 211 zap.Error(err), 212 ) 213 } 214} 215 216func mustUpdateMemberInStore(lg *zap.Logger, s v2store.Store, m *Member) { 217 b, err := json.Marshal(m.RaftAttributes) 218 if err != nil { 219 lg.Panic("failed to marshal raftAttributes", zap.Error(err)) 220 } 221 p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix) 222 if _, err := s.Update(p, string(b), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil { 223 lg.Panic( 224 "failed to update raftAttributes", 225 zap.String("path", p), 226 zap.Error(err), 227 ) 228 } 229} 230 231func mustUpdateMemberAttrInStore(lg *zap.Logger, s v2store.Store, m *Member) { 232 b, err := json.Marshal(m.Attributes) 233 if err != nil { 234 lg.Panic("failed to marshal attributes", zap.Error(err)) 235 } 236 p := path.Join(MemberStoreKey(m.ID), attributesSuffix) 237 if _, err := s.Set(p, false, string(b), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil { 238 lg.Panic( 239 "failed to update attributes", 240 zap.String("path", p), 241 zap.Error(err), 242 ) 243 } 244} 245 246func mustSaveClusterVersionToStore(lg *zap.Logger, s v2store.Store, ver *semver.Version) { 247 if _, err := s.Set(StoreClusterVersionKey(), false, ver.String(), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil { 248 lg.Panic( 249 "failed to save cluster version to store", 250 zap.String("path", StoreClusterVersionKey()), 251 zap.Error(err), 252 ) 253 } 254} 255 256// nodeToMember builds member from a key value node. 257// the child nodes of the given node MUST be sorted by key. 258func nodeToMember(lg *zap.Logger, n *v2store.NodeExtern) (*Member, error) { 259 m := &Member{ID: MustParseMemberIDFromKey(lg, n.Key)} 260 attrs := make(map[string][]byte) 261 raftAttrKey := path.Join(n.Key, raftAttributesSuffix) 262 attrKey := path.Join(n.Key, attributesSuffix) 263 for _, nn := range n.Nodes { 264 if nn.Key != raftAttrKey && nn.Key != attrKey { 265 return nil, fmt.Errorf("unknown key %q", nn.Key) 266 } 267 attrs[nn.Key] = []byte(*nn.Value) 268 } 269 if data := attrs[raftAttrKey]; data != nil { 270 if err := json.Unmarshal(data, &m.RaftAttributes); err != nil { 271 return nil, fmt.Errorf("unmarshal raftAttributes error: %v", err) 272 } 273 } else { 274 return nil, fmt.Errorf("raftAttributes key doesn't exist") 275 } 276 if data := attrs[attrKey]; data != nil { 277 if err := json.Unmarshal(data, &m.Attributes); err != nil { 278 return m, fmt.Errorf("unmarshal attributes error: %v", err) 279 } 280 } 281 return m, nil 282} 283 284func backendMemberKey(id types.ID) []byte { 285 return []byte(id.String()) 286} 287 288func backendClusterVersionKey() []byte { 289 return []byte("clusterVersion") 290} 291 292func backendDowngradeKey() []byte { 293 return []byte("downgrade") 294} 295 296func mustCreateBackendBuckets(be backend.Backend) { 297 tx := be.BatchTx() 298 tx.Lock() 299 defer tx.Unlock() 300 tx.UnsafeCreateBucket(buckets.Members) 301 tx.UnsafeCreateBucket(buckets.MembersRemoved) 302 tx.UnsafeCreateBucket(buckets.Cluster) 303} 304 305func MemberStoreKey(id types.ID) string { 306 return path.Join(StoreMembersPrefix, id.String()) 307} 308 309func StoreClusterVersionKey() string { 310 return path.Join(storePrefix, "version") 311} 312 313func MemberAttributesStorePath(id types.ID) string { 314 return path.Join(MemberStoreKey(id), attributesSuffix) 315} 316 317func mustParseMemberIDFromBytes(lg *zap.Logger, key []byte) types.ID { 318 id, err := types.IDFromString(string(key)) 319 if err != nil { 320 lg.Panic("failed to parse member id from key", zap.Error(err)) 321 } 322 return id 323} 324 325func MustParseMemberIDFromKey(lg *zap.Logger, key string) types.ID { 326 id, err := types.IDFromString(path.Base(key)) 327 if err != nil { 328 lg.Panic("failed to parse member id from key", zap.Error(err)) 329 } 330 return id 331} 332 333func RemovedMemberStoreKey(id types.ID) string { 334 return path.Join(storeRemovedMembersPrefix, id.String()) 335} 336