1// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package membership
16
17import (
18	"encoding/json"
19	"fmt"
20	"path"
21
22	"go.etcd.io/etcd/client/pkg/v3/types"
23	"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
24	"go.etcd.io/etcd/server/v3/mvcc/backend"
25	"go.etcd.io/etcd/server/v3/mvcc/buckets"
26
27	"github.com/coreos/go-semver/semver"
28	"go.uber.org/zap"
29)
30
31const (
32	attributesSuffix     = "attributes"
33	raftAttributesSuffix = "raftAttributes"
34
35	// the prefix for storing membership related information in store provided by store pkg.
36	storePrefix = "/0"
37)
38
39var (
40	StoreMembersPrefix        = path.Join(storePrefix, "members")
41	storeRemovedMembersPrefix = path.Join(storePrefix, "removed_members")
42)
43
44func mustSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) {
45	mkey := backendMemberKey(m.ID)
46	mvalue, err := json.Marshal(m)
47	if err != nil {
48		lg.Panic("failed to marshal member", zap.Error(err))
49	}
50
51	tx := be.BatchTx()
52	tx.Lock()
53	defer tx.Unlock()
54	tx.UnsafePut(buckets.Members, mkey, mvalue)
55}
56
57// TrimClusterFromBackend removes all information about cluster (versions)
58// from the v3 backend.
59func TrimClusterFromBackend(be backend.Backend) error {
60	tx := be.BatchTx()
61	tx.Lock()
62	defer tx.Unlock()
63	tx.UnsafeDeleteBucket(buckets.Cluster)
64	return nil
65}
66
67func mustDeleteMemberFromBackend(be backend.Backend, id types.ID) {
68	mkey := backendMemberKey(id)
69
70	tx := be.BatchTx()
71	tx.Lock()
72	defer tx.Unlock()
73	tx.UnsafeDelete(buckets.Members, mkey)
74	tx.UnsafePut(buckets.MembersRemoved, mkey, []byte("removed"))
75}
76
77func readMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*Member, map[types.ID]bool, error) {
78	members := make(map[types.ID]*Member)
79	removed := make(map[types.ID]bool)
80
81	tx := be.ReadTx()
82	tx.RLock()
83	defer tx.RUnlock()
84	err := tx.UnsafeForEach(buckets.Members, func(k, v []byte) error {
85		memberId := mustParseMemberIDFromBytes(lg, k)
86		m := &Member{ID: memberId}
87		if err := json.Unmarshal(v, &m); err != nil {
88			return err
89		}
90		members[memberId] = m
91		return nil
92	})
93	if err != nil {
94		return nil, nil, fmt.Errorf("couldn't read members from backend: %w", err)
95	}
96
97	err = tx.UnsafeForEach(buckets.MembersRemoved, func(k, v []byte) error {
98		memberId := mustParseMemberIDFromBytes(lg, k)
99		removed[memberId] = true
100		return nil
101	})
102	if err != nil {
103		return nil, nil, fmt.Errorf("couldn't read members_removed from backend: %w", err)
104	}
105	return members, removed, nil
106}
107
108func mustReadMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*Member, map[types.ID]bool) {
109	members, removed, err := readMembersFromBackend(lg, be)
110	if err != nil {
111		lg.Panic("couldn't read members from backend", zap.Error(err))
112	}
113	return members, removed
114}
115
116// TrimMembershipFromBackend removes all information about members &
117// removed_members from the v3 backend.
118func TrimMembershipFromBackend(lg *zap.Logger, be backend.Backend) error {
119	lg.Info("Trimming membership information from the backend...")
120	tx := be.BatchTx()
121	tx.Lock()
122	defer tx.Unlock()
123	err := tx.UnsafeForEach(buckets.Members, func(k, v []byte) error {
124		tx.UnsafeDelete(buckets.Members, k)
125		lg.Debug("Removed member from the backend",
126			zap.Stringer("member", mustParseMemberIDFromBytes(lg, k)))
127		return nil
128	})
129	if err != nil {
130		return err
131	}
132	return tx.UnsafeForEach(buckets.MembersRemoved, func(k, v []byte) error {
133		tx.UnsafeDelete(buckets.MembersRemoved, k)
134		lg.Debug("Removed removed_member from the backend",
135			zap.Stringer("member", mustParseMemberIDFromBytes(lg, k)))
136		return nil
137	})
138}
139
140// TrimMembershipFromV2Store removes all information about members &
141// removed_members from the v2 store.
142func TrimMembershipFromV2Store(lg *zap.Logger, s v2store.Store) error {
143	members, removed := membersFromStore(lg, s)
144
145	for mID := range members {
146		_, err := s.Delete(MemberStoreKey(mID), true, true)
147		if err != nil {
148			return err
149		}
150	}
151	for mID := range removed {
152		_, err := s.Delete(RemovedMemberStoreKey(mID), true, true)
153		if err != nil {
154			return err
155		}
156	}
157
158	return nil
159}
160
161// The field is populated since etcd v3.5.
162func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) {
163	ckey := backendClusterVersionKey()
164
165	tx := be.BatchTx()
166	tx.Lock()
167	defer tx.Unlock()
168	tx.UnsafePut(buckets.Cluster, ckey, []byte(ver.String()))
169}
170
171// The field is populated since etcd v3.5.
172func mustSaveDowngradeToBackend(lg *zap.Logger, be backend.Backend, downgrade *DowngradeInfo) {
173	dkey := backendDowngradeKey()
174	dvalue, err := json.Marshal(downgrade)
175	if err != nil {
176		lg.Panic("failed to marshal downgrade information", zap.Error(err))
177	}
178	tx := be.BatchTx()
179	tx.Lock()
180	defer tx.Unlock()
181	tx.UnsafePut(buckets.Cluster, dkey, dvalue)
182}
183
184func mustSaveMemberToStore(lg *zap.Logger, s v2store.Store, m *Member) {
185	b, err := json.Marshal(m.RaftAttributes)
186	if err != nil {
187		lg.Panic("failed to marshal raftAttributes", zap.Error(err))
188	}
189	p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix)
190	if _, err := s.Create(p, false, string(b), false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
191		lg.Panic(
192			"failed to save member to store",
193			zap.String("path", p),
194			zap.Error(err),
195		)
196	}
197}
198
199func mustDeleteMemberFromStore(lg *zap.Logger, s v2store.Store, id types.ID) {
200	if _, err := s.Delete(MemberStoreKey(id), true, true); err != nil {
201		lg.Panic(
202			"failed to delete member from store",
203			zap.String("path", MemberStoreKey(id)),
204			zap.Error(err),
205		)
206	}
207	if _, err := s.Create(RemovedMemberStoreKey(id), false, "", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
208		lg.Panic(
209			"failed to create removedMember",
210			zap.String("path", RemovedMemberStoreKey(id)),
211			zap.Error(err),
212		)
213	}
214}
215
216func mustUpdateMemberInStore(lg *zap.Logger, s v2store.Store, m *Member) {
217	b, err := json.Marshal(m.RaftAttributes)
218	if err != nil {
219		lg.Panic("failed to marshal raftAttributes", zap.Error(err))
220	}
221	p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix)
222	if _, err := s.Update(p, string(b), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
223		lg.Panic(
224			"failed to update raftAttributes",
225			zap.String("path", p),
226			zap.Error(err),
227		)
228	}
229}
230
231func mustUpdateMemberAttrInStore(lg *zap.Logger, s v2store.Store, m *Member) {
232	b, err := json.Marshal(m.Attributes)
233	if err != nil {
234		lg.Panic("failed to marshal attributes", zap.Error(err))
235	}
236	p := path.Join(MemberStoreKey(m.ID), attributesSuffix)
237	if _, err := s.Set(p, false, string(b), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
238		lg.Panic(
239			"failed to update attributes",
240			zap.String("path", p),
241			zap.Error(err),
242		)
243	}
244}
245
246func mustSaveClusterVersionToStore(lg *zap.Logger, s v2store.Store, ver *semver.Version) {
247	if _, err := s.Set(StoreClusterVersionKey(), false, ver.String(), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
248		lg.Panic(
249			"failed to save cluster version to store",
250			zap.String("path", StoreClusterVersionKey()),
251			zap.Error(err),
252		)
253	}
254}
255
256// nodeToMember builds member from a key value node.
257// the child nodes of the given node MUST be sorted by key.
258func nodeToMember(lg *zap.Logger, n *v2store.NodeExtern) (*Member, error) {
259	m := &Member{ID: MustParseMemberIDFromKey(lg, n.Key)}
260	attrs := make(map[string][]byte)
261	raftAttrKey := path.Join(n.Key, raftAttributesSuffix)
262	attrKey := path.Join(n.Key, attributesSuffix)
263	for _, nn := range n.Nodes {
264		if nn.Key != raftAttrKey && nn.Key != attrKey {
265			return nil, fmt.Errorf("unknown key %q", nn.Key)
266		}
267		attrs[nn.Key] = []byte(*nn.Value)
268	}
269	if data := attrs[raftAttrKey]; data != nil {
270		if err := json.Unmarshal(data, &m.RaftAttributes); err != nil {
271			return nil, fmt.Errorf("unmarshal raftAttributes error: %v", err)
272		}
273	} else {
274		return nil, fmt.Errorf("raftAttributes key doesn't exist")
275	}
276	if data := attrs[attrKey]; data != nil {
277		if err := json.Unmarshal(data, &m.Attributes); err != nil {
278			return m, fmt.Errorf("unmarshal attributes error: %v", err)
279		}
280	}
281	return m, nil
282}
283
284func backendMemberKey(id types.ID) []byte {
285	return []byte(id.String())
286}
287
288func backendClusterVersionKey() []byte {
289	return []byte("clusterVersion")
290}
291
292func backendDowngradeKey() []byte {
293	return []byte("downgrade")
294}
295
296func mustCreateBackendBuckets(be backend.Backend) {
297	tx := be.BatchTx()
298	tx.Lock()
299	defer tx.Unlock()
300	tx.UnsafeCreateBucket(buckets.Members)
301	tx.UnsafeCreateBucket(buckets.MembersRemoved)
302	tx.UnsafeCreateBucket(buckets.Cluster)
303}
304
305func MemberStoreKey(id types.ID) string {
306	return path.Join(StoreMembersPrefix, id.String())
307}
308
309func StoreClusterVersionKey() string {
310	return path.Join(storePrefix, "version")
311}
312
313func MemberAttributesStorePath(id types.ID) string {
314	return path.Join(MemberStoreKey(id), attributesSuffix)
315}
316
317func mustParseMemberIDFromBytes(lg *zap.Logger, key []byte) types.ID {
318	id, err := types.IDFromString(string(key))
319	if err != nil {
320		lg.Panic("failed to parse member id from key", zap.Error(err))
321	}
322	return id
323}
324
325func MustParseMemberIDFromKey(lg *zap.Logger, key string) types.ID {
326	id, err := types.IDFromString(path.Base(key))
327	if err != nil {
328		lg.Panic("failed to parse member id from key", zap.Error(err))
329	}
330	return id
331}
332
333func RemovedMemberStoreKey(id types.ID) string {
334	return path.Join(storeRemovedMembersPrefix, id.String())
335}
336